summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-19 16:06:22 +0000
committerTed Ross <tross@apache.org>2010-02-19 16:06:22 +0000
commit2530d09b874d23edf5ec9743d1e4ad1589bd4443 (patch)
tree2c790e64d8004e945ec60c039db3ef630d059b23
parent255334aa9b170f6ad8054677bdbdc5fd30ea2c1d (diff)
downloadqpid-python-2530d09b874d23edf5ec9743d1e4ad1589bd4443.tar.gz
Checking in work-in-progress for the qmf branch.
This branch will not compile until further changes are checked in. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@911854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/include/qmf/Connection.h125
-rw-r--r--qpid/cpp/include/qmf/ConnectionSettings.h32
-rw-r--r--qpid/cpp/include/qmf/Protocol.h (renamed from qpid/cpp/src/qmf/engine/Protocol.h)35
-rw-r--r--qpid/cpp/include/qmf/engine/Agent.h85
-rw-r--r--qpid/cpp/include/qmf/engine/ConnectionSettings.h150
-rw-r--r--qpid/cpp/include/qmf/engine/Console.h14
-rw-r--r--qpid/cpp/include/qmf/engine/Message.h41
-rw-r--r--qpid/cpp/include/qmf/engine/Object.h23
-rw-r--r--qpid/cpp/include/qmf/engine/Query.h76
-rw-r--r--qpid/cpp/include/qmf/engine/ResilientConnection.h173
-rw-r--r--qpid/cpp/include/qmf/engine/Schema.h39
-rw-r--r--qpid/cpp/include/qmf/engine/Typecode.h53
-rw-r--r--qpid/cpp/include/qmf/engine/Value.h122
-rw-r--r--qpid/cpp/src/qmf.mk44
-rw-r--r--qpid/cpp/src/qmf/Protocol.cpp54
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp610
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h22
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp278
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h63
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h5
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.cpp43
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.h44
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.cpp201
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.h60
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.cpp52
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.cpp76
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.h84
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp514
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp319
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h87
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.cpp374
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.h160
32 files changed, 582 insertions, 3476 deletions
diff --git a/qpid/cpp/include/qmf/Connection.h b/qpid/cpp/include/qmf/Connection.h
deleted file mode 100644
index f648b1427f..0000000000
--- a/qpid/cpp/include/qmf/Connection.h
+++ /dev/null
@@ -1,125 +0,0 @@
-#ifndef _QmfConnection_
-#define _QmfConnection_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/QmfImportExport.h"
-#include "qmf/ConnectionSettings.h"
-
-namespace qmf {
-
- /**
- * Operational states for Connections.
- *
- * \ingroup qmfapi
- */
- enum ConnectionState {
- CONNECTION_UP = 1,
- CONNECTION_DOWN = 2
- };
-
- /**
- * Implement a subclass of ConnectionListener and provide it with the
- * Connection constructor to receive notification of changes in the
- * connection state.
- *
- * \ingroup qmfapi
- */
- class ConnectionListener {
- QMF_EXTERN virtual ~ConnectionListener();
-
- /**
- * Called each time the state of the connection changes.
- *
- * @param state the new state
- */
- virtual void newState(ConnectionState state);
-
- /**
- * Called if the connection requires input from an interactive client.
- *
- * @param prompt Text of the prompt - describes what information is required.
- * @param answer The interactive user input.
- * @param answerLen on Input - the maximum number of bytes that can be copied to answer.
- * on Output - the number of bytes copied to answer.
- */
- virtual void interactivePrompt(const char* prompt, char* answer, uint32_t answerLen);
- };
-
- class ConnectionImpl;
-
- /**
- * The Connection class represents a connection to a QPID broker that can
- * be used by agents and consoles, possibly multiple at the same time.
- *
- * \ingroup qmfapi
- */
- class Connection {
- public:
-
- /**
- * Creates a connection object and begins the process of attempting to
- * connect to the QPID broker.
- *
- * @param settings The settings that control how the connection is set
- * up.
- *
- * @param listener An optional pointer to a subclass of
- * ConnectionListener to receive notifications of events related to
- * this connection.
- */
- QMF_EXTERN Connection(const ConnectionSettings& settings,
- const ConnectionListener* listener = 0);
-
- /**
- * Destroys a connection, causing the connection to be closed.
- */
- QMF_EXTERN ~Connection();
-
- /**
- * Set the administrative state of the connection (enabled or disabled).
- *
- * @param enabled True => enable connection, False => disable connection
- */
- QMF_EXTERN void setAdminState(bool enabled);
-
- /**
- * Return the current operational state of the connection (up or down).
- *
- * @return the current connection state.
- */
- QMF_EXTERN ConnectionState getOperState() const;
-
- /**
- * Get the error message from the last failure to connect.
- *
- * @return Null-terminated string containing the error message.
- */
- QMF_EXTERN const char* getLastError() const;
-
- private:
- friend class AgentImpl;
- friend class ConsoleImpl;
- ConnectionImpl* impl;
- };
-
-}
-
-#endif
diff --git a/qpid/cpp/include/qmf/ConnectionSettings.h b/qpid/cpp/include/qmf/ConnectionSettings.h
deleted file mode 100644
index 11af73d797..0000000000
--- a/qpid/cpp/include/qmf/ConnectionSettings.h
+++ /dev/null
@@ -1,32 +0,0 @@
-#ifndef _QmfConnectionSettings_
-#define _QmfConnectionSettings_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace qmf {
- namespace engine {
- class ConnectionSettings;
- }
-
- typedef class engine::ConnectionSettings ConnectionSettings;
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/Protocol.h b/qpid/cpp/include/qmf/Protocol.h
index 1cdfa60c84..07870b0eb5 100644
--- a/qpid/cpp/src/qmf/engine/Protocol.h
+++ b/qpid/cpp/include/qmf/Protocol.h
@@ -1,5 +1,5 @@
-#ifndef _QmfEngineProtocol_
-#define _QmfEngineProtocol_
+#ifndef _QmfProtocol_
+#define _QmfProtocol_
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -21,21 +21,40 @@
*/
#include <qpid/sys/IntegerTypes.h>
+#include <string>
namespace qpid {
- namespace framing {
- class Buffer;
+ namespace messaging {
+ class Message;
}
}
namespace qmf {
-namespace engine {
class Protocol {
public:
- static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ //static bool checkHeader(const qpid::messaging::Message& msg, std::string& opcode, uint32_t *seq);
+ //static void encodeHeader(qpid::messaging::Message& msg, const std::string& opcode, uint32_t seq = 0);
+ const static std::string SCHEMA_ELT_NAME;
+ const static std::string SCHEMA_ELT_TYPE;
+ const static std::string SCHEMA_ELT_DIR;
+ const static std::string SCHEMA_ELT_UNIT;
+ const static std::string SCHEMA_ELT_DESC;
+ const static std::string SCHEMA_ELT_ACCESS;
+ const static std::string SCHEMA_ELT_OPTIONAL;
+ const static std::string SCHEMA_ARGS;
+ const static std::string SCHEMA_PACKAGE;
+ const static std::string SCHEMA_CLASS_KIND;
+ const static std::string SCHEMA_CLASS_KIND_DATA;
+ const static std::string SCHEMA_CLASS_KIND_EVENT;
+ const static std::string SCHEMA_CLASS;
+ const static std::string SCHEMA_HASH;
+ const static std::string AGENT_NAME;
+ const static std::string OBJECT_NAME;
+ const static std::string SCHEMA_ID;
+
+ /*
const static uint8_t OP_ATTACH_REQUEST = 'A';
const static uint8_t OP_ATTACH_RESPONSE = 'a';
@@ -60,10 +79,10 @@ namespace engine {
const static uint8_t OP_PROPERTY_INDICATION = 'c';
const static uint8_t OP_STATISTIC_INDICATION = 'i';
const static uint8_t OP_EVENT_INDICATION = 'e';
+ */
};
}
-}
#endif
diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h
index 71abf82254..c645b3cc01 100644
--- a/qpid/cpp/include/qmf/engine/Agent.h
+++ b/qpid/cpp/include/qmf/engine/Agent.h
@@ -25,8 +25,8 @@
#include <qmf/engine/Object.h>
#include <qmf/engine/Event.h>
#include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Connection.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -42,12 +42,7 @@ namespace engine {
GET_QUERY = 1,
START_SYNC = 2,
END_SYNC = 3,
- METHOD_CALL = 4,
- DECLARE_QUEUE = 5,
- DELETE_QUEUE = 6,
- BIND = 7,
- UNBIND = 8,
- SETUP_COMPLETE = 9
+ METHOD_CALL = 4
};
EventKind kind;
@@ -55,13 +50,11 @@ namespace engine {
char* authUserId; // Authenticated user ID (for all kinds)
char* authToken; // Authentication token if issued (for all kinds)
char* name; // Name of the method/sync query
- // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND)
+ // (METHOD_CALL, START_SYNC, END_SYNC)
Object* object; // Object involved in method call (METHOD_CALL)
- ObjectId* objectId; // ObjectId for method call (METHOD_CALL)
+ char* objectKey; // Object key for method call (METHOD_CALL)
Query* query; // Query parameters (GET_QUERY, START_SYNC)
- Value* arguments; // Method parameters (METHOD_CALL)
- char* exchange; // Exchange for bind (BIND, UNBIND)
- char* bindingKey; // Key for bind (BIND, UNBIND)
+ qpid::messaging::Variant::Map* arguments; // Method parameters (METHOD_CALL)
const SchemaObjectClass* objectClass; // (METHOD_CALL)
};
@@ -72,10 +65,17 @@ namespace engine {
*/
class Agent {
public:
- Agent(char* label, bool internalStore=true);
+ Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true);
~Agent();
/**
+ * Set an agent attribute that can be used to describe this agent to consoles.
+ *@param key Null-terminated string that is the name of the attribute.
+ *@param value Variant value (or any API type) of the attribute.
+ */
+ void setAttr(const char* key, const qpid::messaging::Variant& value);
+
+ /**
* Configure the directory path for storing persistent data.
*@param path Null-terminated string containing a directory path where files can be
* created, written, and read. If NULL, no persistent storage will be
@@ -92,24 +92,6 @@ namespace engine {
void setTransferDir(const char* path);
/**
- * Pass messages received from the AMQP session to the Agent engine.
- *@param message AMQP messages received on the agent session.
- */
- void handleRcvMessage(Message& message);
-
- /**
- * Get the next message to be sent to the AMQP network.
- *@param item The Message structure describing the message to be produced.
- *@return true if the Message is valid, false if there are no messages to send.
- */
- bool getXmtMessage(Message& item) const;
-
- /**
- * Remove and discard one message from the head of the transmit queue.
- */
- void popXmt();
-
- /**
* Get the next application event from the agent engine.
*@param event The event iff the return value is true
*@return true if event is valid, false if there are no events to process
@@ -122,20 +104,9 @@ namespace engine {
void popEvent();
/**
- * A new AMQP session has been established for Agent communication.
- */
- void newSession();
-
- /**
- * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event
- * is received from the Agent engine.
- */
- void startProtocol();
-
- /**
- * This method is called periodically so the agent can supply a heartbeat.
+ * Provide the AMQP connection to be used for this agent.
*/
- void heartbeat();
+ void setConnection(qpid::messaging::Connection& conn);
/**
* Respond to a method request.
@@ -144,7 +115,7 @@ namespace engine {
*@param text Status text ("OK" or an error message)
*@param arguments The list of output arguments from the method call.
*/
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const qpid::messaging::Variant::Map& arguments);
/**
* Send a content indication to the QMF bus. This is only needed for objects that are
@@ -152,10 +123,8 @@ namespace engine {
* (inserted using addObject).
*@param sequence The sequence number of the GET request or the SYNC_START request.
*@param object The object (annotated with "changed" flags) for publication.
- *@param prop If true, changed object properties are transmitted.
- *@param stat If true, changed object statistics are transmitted.
*/
- void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true);
+ void queryResponse(uint32_t sequence, Object& object);
/**
* Indicate the completion of a query. This is not used for SYNC_START requests.
@@ -178,20 +147,12 @@ namespace engine {
/**
* Give an object to the Agent for storage and management. Once added, the agent takes
* responsibility for the life cycle of the object.
- *@param obj The object to be managed by the Agent.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId of the managed object.
- */
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- // const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi);
-
- /**
- * Allocate an object-id for an object that will be managed by the application.
- *@param persistId A unique non-zero value if the object-id is to be persistent.
- *@return The objectId structure for the allocated ID.
+ *@param obj The object to be managed by the Agent.
+ *@param key A unique name (a primary key) to be used to address this object. If
+ * left null, the agent will create a unique name for the object.
+ *@return The key for the managed object.
*/
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ const char* addObject(Object& obj, const char* key=0);
/**
* Raise an event into the QMF network..
diff --git a/qpid/cpp/include/qmf/engine/ConnectionSettings.h b/qpid/cpp/include/qmf/engine/ConnectionSettings.h
deleted file mode 100644
index 36312400b1..0000000000
--- a/qpid/cpp/include/qmf/engine/ConnectionSettings.h
+++ /dev/null
@@ -1,150 +0,0 @@
-#ifndef _QmfEngineConnectionSettings_
-#define _QmfEngineConnectionSettings_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/QmfEngineImportExport.h"
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qmf {
-namespace engine {
-
- class ConnectionSettingsImpl;
- class Value;
-
- /**
- * Settings for AMQP connections to the broker.
- *
- * \ingroup qmfapi
- */
- class ConnectionSettings {
- public:
-
- /**
- * Create a set of default connection settings.
- *
- * If no further attributes are set, the settings will cause a connection to be made to
- * the default broker (on localhost or at a host/port supplied by service discovery) and
- * authentication will be the best-available (GSSAPI/Kerberos, Anonymous, Plain with prompts
- * for username and password).
- */
- QMFE_EXTERN ConnectionSettings();
-
- /**
- * Create a set of connection settings by URL.
- *
- * @param url Universal resource locator describing the broker address and additional attributes.
- *
- * The URL is of the form:
- * amqp[s]://host[:port][?key=value[&key=value]*]
- *
- * For example:
- * amqp://localhost
- * amqp://broker?transport=rdma&authmech=GSSAPI&authservice=qpidd
- * amqps://broker?authmech=PLAIN&authuser=guest&authpass=guest
- */
- QMFE_EXTERN ConnectionSettings(const char* url);
-
- /**
- * Copy Constructor.
- */
- ConnectionSettings(const ConnectionSettings& from);
-
- /**
- * Destroy the connection settings object.
- */
- QMFE_EXTERN ~ConnectionSettings();
-
- /**
- * Set an attribute to control connection setup.
- *
- * @param key A null-terminated string that is an attribute name.
- *
- * @param value Reference to a value to be stored as the attribute. The type of the value
- * is specific to the key.
- *
- * @return True if success, False if invalid attribute
- */
- QMFE_EXTERN bool setAttr(const char* key, const Value& value);
-
- /**
- * Get the value of an attribute.
- *
- * @param key A null-terminated attribute name.
- *
- * @return The value associated with the attribute name.
- */
- QMFE_EXTERN Value getAttr(const char* key) const;
-
- /**
- * Get the attribute string (the portion of the URL following the '?') for the settings.
- *
- * @return A pointer to the attribute string. If the content of this string needs to be
- * available beyond the scope of the calling function, it should be copied. The
- * returned pointer may become invalid if the set of attributes is changed.
- */
- QMFE_EXTERN const char* getAttrString() const;
-
- /**
- * Shortcuts for setting the transport for the connection.
- *
- * @param port The port value for the connection address.
- */
- QMFE_EXTERN void transportTcp(uint16_t port = 5672);
- QMFE_EXTERN void transportSsl(uint16_t port = 5671);
- QMFE_EXTERN void transportRdma(uint16_t port = 5672);
-
- /**
- * Shortcuts for setting authentication mechanisms.
- *
- * @param username Null-terminated authentication user name.
- *
- * @param password Null-terminated authentication password.
- *
- * @param serviceName Null-terminated GSSAPI service name (Kerberos service principal)
- *
- * @param minSsf Minimum security factor for connections. 0 = encryption not required.
- *
- * @param maxSsf Maximum security factor for connections. 0 = encryption not permitted.
- */
- QMFE_EXTERN void authAnonymous(const char* username = 0);
- QMFE_EXTERN void authPlain(const char* username = 0, const char* password = 0);
- QMFE_EXTERN void authGssapi(const char* serviceName, uint32_t minSsf = 0, uint32_t maxSsf = 256);
-
- /**
- * Shortcut for setting connection retry attributes.
- *
- * @param delayMin Minimum delay (in seconds) between connection attempts.
- *
- * @param delaxMax Maximum delay (in seconds) between connection attempts.
- *
- * @param delayFactor Factor to multiply the delay by between failed connection attempts.
- */
- QMFE_EXTERN void setRetry(int delayMin = 1, int delayMax = 128, int delayFactor = 2);
-
- private:
- friend class ResilientConnectionImpl;
- ConnectionSettingsImpl* impl;
- };
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/include/qmf/engine/Console.h b/qpid/cpp/include/qmf/engine/Console.h
index 03b3993395..b99d63b9a6 100644
--- a/qpid/cpp/include/qmf/engine/Console.h
+++ b/qpid/cpp/include/qmf/engine/Console.h
@@ -20,14 +20,12 @@
* under the License.
*/
-#include <qmf/engine/ResilientConnection.h>
#include <qmf/engine/Schema.h>
#include <qmf/engine/ObjectId.h>
#include <qmf/engine/Object.h>
#include <qmf/engine/Event.h>
#include <qmf/engine/Query.h>
-#include <qmf/engine/Value.h>
-#include <qmf/engine/Message.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -49,8 +47,8 @@ namespace engine {
MethodResponse(const MethodResponse& from);
~MethodResponse();
uint32_t getStatus() const;
- const Value* getException() const;
- const Value* getArgs() const;
+ const qpid::messaging::Variant* getException() const;
+ const qpid::messaging::Variant::Map* getArgs() const;
private:
friend struct MethodResponseImpl;
@@ -66,7 +64,7 @@ namespace engine {
public:
~QueryResponse();
uint32_t getStatus() const;
- const Value* getException() const;
+ const qpid::messaging::Variant* getException() const;
uint32_t getObjectCount() const;
const Object* getObject(uint32_t idx) const;
@@ -161,10 +159,6 @@ namespace engine {
void sessionClosed();
void startProtocol();
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
bool getEvent(BrokerEvent& event) const;
void popEvent();
diff --git a/qpid/cpp/include/qmf/engine/Message.h b/qpid/cpp/include/qmf/engine/Message.h
deleted file mode 100644
index 1e95cc6afe..0000000000
--- a/qpid/cpp/include/qmf/engine/Message.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#ifndef _QmfEngineMessage_
-#define _QmfEngineMessage_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/sys/IntegerTypes.h"
-
-namespace qmf {
-namespace engine {
-
- struct Message {
- char* body;
- uint32_t length;
- char* destination;
- char* routingKey;
- char* replyExchange;
- char* replyKey;
- char* userId;
- };
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/include/qmf/engine/Object.h b/qpid/cpp/include/qmf/engine/Object.h
index ad67cfdb95..81dbf45521 100644
--- a/qpid/cpp/include/qmf/engine/Object.h
+++ b/qpid/cpp/include/qmf/engine/Object.h
@@ -22,7 +22,7 @@
#include <qmf/engine/Schema.h>
#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -30,23 +30,26 @@ namespace engine {
struct ObjectImpl;
class Object {
public:
- Object(const SchemaObjectClass* type);
+ Object();
+ Object(SchemaObjectClass* type);
Object(const Object& from);
virtual ~Object();
+ const qpid::messaging::Variant::Map& getValues() const;
+ qpid::messaging::Variant::Map& getValues();
+
+ const SchemaObjectClass* getSchema() const;
+ void setSchema(SchemaObjectClass* schema);
+
+ const char* getKey() const;
+ void setKey(const char* key);
+
+ void touch();
void destroy();
- const ObjectId* getObjectId() const;
- void setObjectId(ObjectId* oid);
- const SchemaObjectClass* getClass() const;
- Value* getValue(const char* key) const;
- void invokeMethod(const char* methodName, const Value* inArgs, void* context) const;
- bool isDeleted() const;
- void merge(const Object& from);
private:
friend struct ObjectImpl;
friend class AgentImpl;
- Object(ObjectImpl* impl);
ObjectImpl* impl;
};
}
diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h
index 3ed08c5d8e..08e4ebf3cc 100644
--- a/qpid/cpp/include/qmf/engine/Query.h
+++ b/qpid/cpp/include/qmf/engine/Query.h
@@ -20,89 +20,39 @@
* under the License.
*/
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Value.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
class Object;
- struct QueryElementImpl;
- struct QueryImpl;
- struct QueryExpressionImpl;
- class SchemaClassKey;
-
- enum ValueOper {
- O_EQ = 1,
- O_NE = 2,
- O_LT = 3,
- O_LE = 4,
- O_GT = 5,
- O_GE = 6,
- O_RE_MATCH = 7,
- O_RE_NOMATCH = 8,
- O_PRESENT = 9,
- O_NOT_PRESENT = 10
- };
-
- struct QueryOperand {
- virtual ~QueryOperand() {}
- virtual bool evaluate(const Object* object) const = 0;
- };
-
- struct QueryElement : public QueryOperand {
- QueryElement(const char* attrName, const Value* value, ValueOper oper);
- QueryElement(QueryElementImpl* impl);
- virtual ~QueryElement();
- bool evaluate(const Object* object) const;
-
- QueryElementImpl* impl;
- };
-
- enum ExprOper {
- E_NOT = 1,
- E_AND = 2,
- E_OR = 3,
- E_XOR = 4
- };
-
- struct QueryExpression : public QueryOperand {
- QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2);
- QueryExpression(QueryExpressionImpl* impl);
- virtual ~QueryExpression();
- bool evaluate(const Object* object) const;
-
- QueryExpressionImpl* impl;
- };
+ class QueryImpl;
class Query {
public:
- Query(const char* className, const char* packageName);
- Query(const SchemaClassKey* key);
- Query(const ObjectId* oid);
+ Query(const char* target);
+ Query(const char* target, const qpid::messaging::Variant::List& predicate);
+ Query(const char* target, const char* expression);
Query(const Query& from);
~Query();
- void setSelect(const QueryOperand* criterion);
- void setLimit(uint32_t maxResults);
- void setOrderBy(const char* attrName, bool decreasing);
-
- const char* getPackage() const;
- const char* getClass() const;
- const ObjectId* getObjectId() const;
+ void where(const qpid::messaging::Variant::List& predicate);
+ void where(const char* expression);
+ void limit(uint32_t maxResults);
+ void orderBy(const char* attrName, bool decreasing);
- bool haveSelect() const;
+ bool havePredicate() const;
bool haveLimit() const;
bool haveOrderBy() const;
- const QueryOperand* getSelect() const;
+ const qpid::messaging::Variant::List& getPredicate() const;
uint32_t getLimit() const;
const char* getOrderBy() const;
bool getDecreasing() const;
+ bool matches(const Object& object) const;
+
private:
friend struct QueryImpl;
- friend class BrokerProxyImpl;
- Query(QueryImpl* impl);
QueryImpl* impl;
};
}
diff --git a/qpid/cpp/include/qmf/engine/ResilientConnection.h b/qpid/cpp/include/qmf/engine/ResilientConnection.h
deleted file mode 100644
index c03d08cb96..0000000000
--- a/qpid/cpp/include/qmf/engine/ResilientConnection.h
+++ /dev/null
@@ -1,173 +0,0 @@
-#ifndef _QmfEngineResilientConnection_
-#define _QmfEngineResilientConnection_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Message.h>
-#include <qmf/engine/ConnectionSettings.h>
-#include <string>
-
-namespace qmf {
-namespace engine {
-
- class ResilientConnectionImpl;
-
- /**
- * Represents events that occur, unsolicited, from ResilientConnection.
- */
- struct ResilientConnectionEvent {
- enum EventKind {
- CONNECTED = 1,
- DISCONNECTED = 2,
- SESSION_CLOSED = 3,
- RECV = 4
- };
-
- EventKind kind;
- void* sessionContext; // SESSION_CLOSED, RECV
- char* errorText; // DISCONNECTED, SESSION_CLOSED
- Message message; // RECV
- };
-
- class SessionHandle {
- friend class ResilientConnectionImpl;
- void* impl;
- };
-
- /**
- * ResilientConnection represents a Qpid connection that is resilient.
- *
- * Upon creation, ResilientConnection attempts to establish a connection to the
- * messaging broker. If it fails, it will continue to retry at an interval that
- * increases over time (to a maximum interval). If an extablished connection is
- * dropped, a reconnect will be attempted.
- */
- class ResilientConnection {
- public:
-
- /**
- * Create a new resilient connection.
- *@param settings Settings that define how the connection is to be made.
- *@param delayMin Minimum delay (in seconds) between retries.
- *@param delayMax Maximum delay (in seconds) between retries.
- *@param delayFactor Factor to multiply retry delay by after each failure.
- */
- ResilientConnection(const ConnectionSettings& settings);
- ~ResilientConnection();
-
- /**
- * Get the connected status of the resilient connection.
- *@return true iff the connection is established.
- */
- bool isConnected() const;
-
- /**
- * Get the next event (if present) from the connection.
- *@param event Returned event if one is available.
- *@return true if event is valid, false if there are no more events to handle.
- */
- bool getEvent(ResilientConnectionEvent& event);
-
- /**
- * Discard the event on the front of the queue. This should be invoked after processing
- * the event from getEvent.
- */
- void popEvent();
-
- /**
- * Create a new AMQP session.
- *@param name Unique name for the session.
- *@param sessionContext Optional user-context value that will be provided in events
- * pertaining to this session.
- *@param handle Output handle to be stored and used in subsequent calls pertaining to
- * this session.
- *@return true iff the session was successfully created.
- */
- bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
-
- /**
- * Destroy a created session.
- *@param handle SessionHandle returned by createSession.
- */
- void destroySession(SessionHandle handle);
-
- /**
- * Send a message into the AMQP broker via a session.
- *@param handle The session handle of the session to transmit through.
- *@param message The QMF message to transmit.
- */
- void sendMessage(SessionHandle handle, Message& message);
-
- /**
- * Declare an exclusive, auto-delete queue for a session.
- *@param handle The session handle for the owner of the queue.
- *@param queue The name of the queue.
- */
- void declareQueue(SessionHandle handle, char* queue);
-
- /**
- * Delete a queue.
- *@param handle The session handle for the owner of the queue.
- *@param queue The name of the queue.
- */
- void deleteQueue(SessionHandle handle, char* queue);
-
- /**
- * Bind a queue to an exchange.
- *@param handle The session handle of the session to use for binding.
- *@param exchange The name of the exchange for binding.
- *@param queue The name of the queue for binding.
- *@param key The binding key.
- */
- void bind(SessionHandle handle, char* exchange, char* queue, char* key);
-
- /**
- * Remove a binding.
- *@param handle The session handle of the session to use for un-binding.
- *@param exchange The name of the exchange.
- *@param queue The name of the queue.
- *@param key The binding key.
- */
- void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
-
- /**
- * Establish a file descriptor for event notification.
- *@param fd A file descriptor into which the connection shall write a character each
- * time an event is enqueued. This fd may be in a pair, the other fd of which
- * is used in a select loop to control execution.
- */
- void setNotifyFd(int fd);
-
- /**
- * Send a byte into the notify file descriptor.
- *
- * This can be used to wake up the event processing portion of the engine from either the
- * wrapped implementation or the engine itself.
- */
- void notify();
-
- private:
- ResilientConnectionImpl* impl;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/include/qmf/engine/Schema.h b/qpid/cpp/include/qmf/engine/Schema.h
index f53e84324a..18a4cef8de 100644
--- a/qpid/cpp/include/qmf/engine/Schema.h
+++ b/qpid/cpp/include/qmf/engine/Schema.h
@@ -20,8 +20,8 @@
* under the License.
*/
-#include <qmf/engine/Typecode.h>
#include <qpid/sys/IntegerTypes.h>
+#include <qpid/messaging/Variant.h>
namespace qmf {
namespace engine {
@@ -29,12 +29,10 @@ namespace engine {
enum Access { ACCESS_READ_CREATE = 1, ACCESS_READ_WRITE = 2, ACCESS_READ_ONLY = 3 };
enum Direction { DIR_IN = 1, DIR_OUT = 2, DIR_IN_OUT = 3 };
enum ClassKind { CLASS_OBJECT = 1, CLASS_EVENT = 2 };
- enum Severity { SEV_EMERG = 0, SEV_ALERT = 1, SEV_CRIT = 2, SEV_ERROR = 3, SEV_WARN = 4, SEV_NOTICE = 5, SEV_INFORM = 6, SEV_DEBUG = 7 };
struct SchemaArgumentImpl;
struct SchemaMethodImpl;
struct SchemaPropertyImpl;
- struct SchemaStatisticImpl;
struct SchemaObjectClassImpl;
struct SchemaEventClassImpl;
struct SchemaClassKeyImpl;
@@ -43,14 +41,14 @@ namespace engine {
*/
class SchemaArgument {
public:
- SchemaArgument(const char* name, Typecode typecode);
+ SchemaArgument(const char* name, qpid::messaging::VariantType typecode);
SchemaArgument(const SchemaArgument& from);
~SchemaArgument();
void setDirection(Direction dir);
void setUnit(const char* val);
void setDesc(const char* desc);
const char* getName() const;
- Typecode getType() const;
+ qpid::messaging::VariantType getType() const;
Direction getDirection() const;
const char* getUnit() const;
const char* getDesc() const;
@@ -89,7 +87,7 @@ namespace engine {
*/
class SchemaProperty {
public:
- SchemaProperty(const char* name, Typecode typecode);
+ SchemaProperty(const char* name, qpid::messaging::VariantType typecode);
SchemaProperty(const SchemaProperty& from);
~SchemaProperty();
void setAccess(Access access);
@@ -98,7 +96,7 @@ namespace engine {
void setUnit(const char* val);
void setDesc(const char* desc);
const char* getName() const;
- Typecode getType() const;
+ qpid::messaging::VariantType getType() const;
Access getAccess() const;
bool isIndex() const;
bool isOptional() const;
@@ -114,27 +112,6 @@ namespace engine {
/**
*/
- class SchemaStatistic {
- public:
- SchemaStatistic(const char* name, Typecode typecode);
- SchemaStatistic(const SchemaStatistic& from);
- ~SchemaStatistic();
- void setUnit(const char* val);
- void setDesc(const char* desc);
- const char* getName() const;
- Typecode getType() const;
- const char* getUnit() const;
- const char* getDesc() const;
-
- private:
- friend struct SchemaStatisticImpl;
- friend struct SchemaObjectClassImpl;
- SchemaStatistic(SchemaStatisticImpl* impl);
- SchemaStatisticImpl* impl;
- };
-
- /**
- */
class SchemaClassKey {
public:
SchemaClassKey(const SchemaClassKey& from);
@@ -164,15 +141,12 @@ namespace engine {
SchemaObjectClass(const SchemaObjectClass& from);
~SchemaObjectClass();
void addProperty(const SchemaProperty* property);
- void addStatistic(const SchemaStatistic* statistic);
void addMethod(const SchemaMethod* method);
const SchemaClassKey* getClassKey() const;
int getPropertyCount() const;
- int getStatisticCount() const;
int getMethodCount() const;
const SchemaProperty* getProperty(int idx) const;
- const SchemaStatistic* getStatistic(int idx) const;
const SchemaMethod* getMethod(int idx) const;
private:
@@ -187,14 +161,13 @@ namespace engine {
*/
class SchemaEventClass {
public:
- SchemaEventClass(const char* package, const char* name, Severity severity);
+ SchemaEventClass(const char* package, const char* name);
SchemaEventClass(const SchemaEventClass& from);
~SchemaEventClass();
void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc);
const SchemaClassKey* getClassKey() const;
- Severity getSeverity() const;
int getArgumentCount() const;
const SchemaArgument* getArgument(int idx) const;
diff --git a/qpid/cpp/include/qmf/engine/Typecode.h b/qpid/cpp/include/qmf/engine/Typecode.h
deleted file mode 100644
index 613f96a483..0000000000
--- a/qpid/cpp/include/qmf/engine/Typecode.h
+++ /dev/null
@@ -1,53 +0,0 @@
-#ifndef _QmfEngineTypecode_
-#define _QmfEngineTypecode_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-namespace qmf {
-namespace engine {
-
- enum Typecode {
- TYPE_UINT8 = 1,
- TYPE_UINT16 = 2,
- TYPE_UINT32 = 3,
- TYPE_UINT64 = 4,
- TYPE_SSTR = 6,
- TYPE_LSTR = 7,
- TYPE_ABSTIME = 8,
- TYPE_DELTATIME = 9,
- TYPE_REF = 10,
- TYPE_BOOL = 11,
- TYPE_FLOAT = 12,
- TYPE_DOUBLE = 13,
- TYPE_UUID = 14,
- TYPE_MAP = 15,
- TYPE_INT8 = 16,
- TYPE_INT16 = 17,
- TYPE_INT32 = 18,
- TYPE_INT64 = 19,
- TYPE_OBJECT = 20,
- TYPE_LIST = 21,
- TYPE_ARRAY = 22
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/include/qmf/engine/Value.h b/qpid/cpp/include/qmf/engine/Value.h
deleted file mode 100644
index 5b45061b78..0000000000
--- a/qpid/cpp/include/qmf/engine/Value.h
+++ /dev/null
@@ -1,122 +0,0 @@
-#ifndef _QmfEngineValue_
-#define _QmfEngineValue_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/ObjectId.h>
-#include <qmf/engine/Typecode.h>
-
-namespace qmf {
-namespace engine {
-
- class Object;
- struct ValueImpl;
-
- class Value {
- public:
- // Value();
- Value(const Value& from);
- Value(Typecode t, Typecode arrayType = TYPE_UINT8);
- ~Value();
-
- Typecode getType() const;
- bool isNull() const;
- void setNull();
-
- bool isObjectId() const;
- const ObjectId& asObjectId() const;
- void setObjectId(const ObjectId& oid);
-
- bool isUint() const;
- uint32_t asUint() const;
- void setUint(uint32_t val);
-
- bool isInt() const;
- int32_t asInt() const;
- void setInt(int32_t val);
-
- bool isUint64() const;
- uint64_t asUint64() const;
- void setUint64(uint64_t val);
-
- bool isInt64() const;
- int64_t asInt64() const;
- void setInt64(int64_t val);
-
- bool isString() const;
- const char* asString() const;
- void setString(const char* val);
-
- bool isBool() const;
- bool asBool() const;
- void setBool(bool val);
-
- bool isFloat() const;
- float asFloat() const;
- void setFloat(float val);
-
- bool isDouble() const;
- double asDouble() const;
- void setDouble(double val);
-
- bool isUuid() const;
- const uint8_t* asUuid() const;
- void setUuid(const uint8_t* val);
-
- bool isObject() const;
- const Object* asObject() const;
- void setObject(Object* val);
-
- bool isMap() const;
- bool keyInMap(const char* key) const;
- Value* byKey(const char* key);
- const Value* byKey(const char* key) const;
- void deleteKey(const char* key);
- void insert(const char* key, Value* val);
- uint32_t keyCount() const;
- const char* key(uint32_t idx) const;
-
- bool isList() const;
- uint32_t listItemCount() const;
- Value* listItem(uint32_t idx);
- void appendToList(Value* val);
- void deleteListItem(uint32_t idx);
-
- bool isArray() const;
- Typecode arrayType() const;
- uint32_t arrayItemCount() const;
- Value* arrayItem(uint32_t idx);
- void appendToArray(Value* val);
- void deleteArrayItem(uint32_t idx);
-
- private:
- friend struct ValueImpl;
- friend class BrokerProxyImpl;
- friend struct ObjectImpl;
- friend struct EventImpl;
- friend class AgentImpl;
- Value(ValueImpl* impl);
- ValueImpl* impl;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 1e4c59b19e..34f93c63ed 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -31,9 +31,7 @@ QMF_API = \
../include/qpid/agent/ManagementAgent.h \
../include/qpid/agent/QmfAgentImportExport.h \
../include/qmf/Agent.h \
- ../include/qmf/Connection.h \
../include/qmf/QmfImportExport.h \
- ../include/qmf/ConnectionSettings.h \
../include/qmf/AgentObject.h
#
@@ -41,18 +39,14 @@ QMF_API = \
#
QMF_ENGINE_API = \
../include/qmf/engine/Agent.h \
- ../include/qmf/engine/ConnectionSettings.h \
../include/qmf/engine/Console.h \
../include/qmf/engine/Event.h \
- ../include/qmf/engine/Message.h \
../include/qmf/engine/Object.h \
- ../include/qmf/engine/ObjectId.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
- ../include/qmf/engine/ResilientConnection.h \
- ../include/qmf/engine/Schema.h \
- ../include/qmf/engine/Typecode.h \
- ../include/qmf/engine/Value.h
+ ../include/qmf/engine/Schema.h
+
+# ../include/qmf/engine/ObjectId.h
# Public header files
nobase_include_HEADERS += \
@@ -67,31 +61,23 @@ libqmf_la_SOURCES = \
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
- qmf/engine/BrokerProxyImpl.cpp \
- qmf/engine/BrokerProxyImpl.h \
- qmf/engine/ConnectionSettingsImpl.cpp \
- qmf/engine/ConnectionSettingsImpl.h \
- qmf/engine/ConsoleImpl.cpp \
- qmf/engine/ConsoleImpl.h \
- qmf/engine/EventImpl.cpp \
- qmf/engine/EventImpl.h \
- qmf/engine/MessageImpl.cpp \
- qmf/engine/MessageImpl.h \
- qmf/engine/ObjectIdImpl.cpp \
- qmf/engine/ObjectIdImpl.h \
qmf/engine/ObjectImpl.cpp \
qmf/engine/ObjectImpl.h \
- qmf/engine/Protocol.cpp \
- qmf/engine/Protocol.h \
+ qmf/Protocol.cpp \
+ qmf/Protocol.h \
qmf/engine/QueryImpl.cpp \
qmf/engine/QueryImpl.h \
- qmf/engine/ResilientConnection.cpp \
- qmf/engine/SequenceManager.cpp \
- qmf/engine/SequenceManager.h \
qmf/engine/SchemaImpl.cpp \
- qmf/engine/SchemaImpl.h \
- qmf/engine/ValueImpl.cpp \
- qmf/engine/ValueImpl.h
+ qmf/engine/SchemaImpl.h
+
+# qmf/engine/BrokerProxyImpl.cpp
+# qmf/engine/BrokerProxyImpl.h
+# qmf/engine/ConsoleImpl.cpp
+# qmf/engine/ConsoleImpl.h
+# qmf/engine/ObjectIdImpl.cpp
+# qmf/engine/ObjectIdImpl.h
+# qmf/engine/SequenceManager.cpp
+# qmf/engine/SequenceManager.h
libqmf_la_LIBADD = libqmfengine.la
libqmfengine_la_LIBADD = libqpidclient.la
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp
new file mode 100644
index 0000000000..154081c9d4
--- /dev/null
+++ b/qpid/cpp/src/qmf/Protocol.cpp
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Protocol.h"
+
+using namespace std;
+using namespace qmf;
+
+const string Protocol::SCHEMA_ELT_NAME("name");
+const string Protocol::SCHEMA_ELT_TYPE("type");
+const string Protocol::SCHEMA_ELT_DIR("dir");
+const string Protocol::SCHEMA_ELT_UNIT("unit");
+const string Protocol::SCHEMA_ELT_DESC("desc");
+const string Protocol::SCHEMA_ELT_ACCESS("access");
+const string Protocol::SCHEMA_ELT_OPTIONAL("optional");
+const string Protocol::SCHEMA_ARGS("args");
+const string Protocol::SCHEMA_PACKAGE("_package_name");
+const string Protocol::SCHEMA_CLASS_KIND("_type");
+const string Protocol::SCHEMA_CLASS_KIND_DATA("_data");
+const string Protocol::SCHEMA_CLASS_KIND_EVENT("_event");
+const string Protocol::SCHEMA_CLASS("_class_name");
+const string Protocol::SCHEMA_HASH("_hash_str");
+const string Protocol::AGENT_NAME("_agent_name");
+const string Protocol::OBJECT_NAME("_object_name");
+const string Protocol::SCHEMA_ID("_schema_id");
+
+#if 0
+bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)
+{
+ // TODO
+ return true;
+}
+
+void Protocol::encodeHeader(Message& /*msg*/, const string& /*opcode*/, uint32_t /*seq*/)
+{
+ // TODO
+}
+#endif
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index fe9b84c565..ec84b4381d 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -18,23 +18,19 @@
*/
#include "qmf/engine/Agent.h"
-#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/EventImpl.h"
#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
+#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
-#include <string.h>
+#include <qpid/sys/Thread.h>
+#include <qpid/sys/Runnable.h>
+#include <qpid/messaging/Session.h>
+#include <qpid/messaging/Receiver.h>
+#include <qpid/messaging/Sender.h>
+#include <qpid/messaging/Message.h>
#include <string>
#include <deque>
#include <map>
@@ -45,8 +41,8 @@
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
using namespace qpid::sys;
+using namespace qpid::messaging;
namespace qmf {
namespace engine {
@@ -59,11 +55,9 @@ namespace engine {
string authToken;
string name;
Object* object;
- boost::shared_ptr<ObjectId> objectId;
+ string objectKey;
boost::shared_ptr<Query> query;
- boost::shared_ptr<Value> arguments;
- string exchange;
- string bindingKey;
+ boost::shared_ptr<Variant::Map> arguments;
const SchemaObjectClass* objectClass;
AgentEventImpl(AgentEvent::EventKind k) :
@@ -72,71 +66,64 @@ namespace engine {
AgentEvent copy();
};
+ /**
+ * AgentQueryContext is used to track asynchronous requests (Query, Sync, or Method)
+ * sent up to the application.
+ */
struct AgentQueryContext {
typedef boost::shared_ptr<AgentQueryContext> Ptr;
uint32_t sequence;
- string exchange;
- string key;
+ string consoleAddr;
const SchemaMethod* schemaMethod;
AgentQueryContext() : schemaMethod(0) {}
};
- class AgentImpl : public boost::noncopyable {
+ class AgentImpl : public boost::noncopyable, public qpid::sys::Runnable {
public:
- AgentImpl(char* label, bool internalStore);
+ AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
~AgentImpl();
+ void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
bool getEvent(AgentEvent& event) const;
void popEvent();
- void newSession();
- void startProtocol();
- void heartbeat();
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
- void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
+ void setConnection(Connection& conn);
+ void methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments);
+ void queryResponse(uint32_t sequence, Object& object);
void queryComplete(uint32_t sequence);
void registerClass(SchemaObjectClass* cls);
void registerClass(SchemaEventClass* cls);
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
+ const char* addObject(Object& obj, const char* key);
void raiseEvent(Event& event);
+ void run();
+ void stop();
+
private:
mutable Mutex lock;
Mutex addLock;
- string label;
- string queueName;
+ const string vendor;
+ const string product;
+ const string name;
+ const string domain;
+ string directAddr;
+ map<string, Variant> attrMap;
string storeDir;
string transferDir;
bool internalStore;
- uint64_t nextTransientId;
Uuid systemId;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- AgentAttachment attachment;
uint16_t bootSequence;
- uint64_t nextObjectId;
uint32_t nextContextNum;
+ bool running;
deque<AgentEventImpl::Ptr> eventQueue;
- deque<MessageImpl::Ptr> xmtQueue;
map<uint32_t, AgentQueryContext::Ptr> contextMap;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
- static const uint32_t MERR_UNKNOWN_METHOD = 2;
- static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
- static const uint32_t MERR_UNKNOWN_CLASS = 9;
- static const uint32_t MERR_INTERNAL_ERROR = 10;
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
+ Connection connection;
+ Session session;
+ Receiver directReceiver;
+ Receiver topicReceiver;
+ Sender sender;
+ qpid::sys::Thread* thread;
struct AgentClassKey {
string name;
@@ -144,10 +131,6 @@ namespace engine {
AgentClassKey(const string& n, const uint8_t* h) : name(n) {
memcpy(hash, h, 16);
}
- AgentClassKey(Buffer& buffer) {
- buffer.getShortString(name);
- buffer.getBin128(hash);
- }
string repr() {
return name;
}
@@ -176,37 +159,29 @@ namespace engine {
map<string, ClassMaps> packages;
- AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
- AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- AgentEventImpl::Ptr eventSetupComplete();
AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
- boost::shared_ptr<ObjectId> oid);
+ const string& key);
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
+ const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaObjectClass* objectClass);
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
+ void handleRcvMessageLH(qpid::messaging::Message& message);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Buffer& inBuffer);
- void handlePackageRequest(Buffer& inBuffer);
- void handleClassQuery(Buffer& inBuffer);
- void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
+ void handleAttachResponse(Message& msg);
+ void handlePackageRequest(Message& msg);
+ void handleClassQuery(Message& msg);
+ void handleSchemaRequest(Message& msg, uint32_t sequence,
const string& replyToExchange, const string& replyToKey);
- void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleConsoleAddedIndication();
+ void handleGetQuery(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
+ void handleMethodRequest(Message& msg, uint32_t sequence, const string& replyTo, const string& userId);
};
}
}
-const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentImpl::BROKER_KEY = "broker";
-
#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
AgentEvent AgentEventImpl::copy()
@@ -217,33 +192,38 @@ AgentEvent AgentEventImpl::copy()
item.kind = kind;
item.sequence = sequence;
item.object = object;
- item.objectId = objectId.get();
item.query = query.get();
item.arguments = arguments.get();
item.objectClass = objectClass;
+ STRING_REF(objectKey);
STRING_REF(authUserId);
STRING_REF(authToken);
STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
return item;
}
-AgentImpl::AgentImpl(char* _label, bool i) :
- label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
- requestedBrokerBank(0), requestedAgentBank(0),
- assignedBrokerBank(0), assignedAgentBank(0),
- bootSequence(1), nextObjectId(1), nextContextNum(1)
+AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
+ vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+ bootSequence(1), nextContextNum(1), running(true), thread(0)
{
- queueName += label;
+ directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
+ if (_d == 0) {
+ directAddr += " { create:always }";
+ }
}
+
AgentImpl::~AgentImpl()
{
}
+void AgentImpl::setAttr(const char* key, const Variant& value)
+{
+ attrMap.insert(pair<string, Variant>(key, value));
+}
+
void AgentImpl::setStoreDir(const char* path)
{
Mutex::ScopedLock _lock(lock);
@@ -262,6 +242,7 @@ void AgentImpl::setTransferDir(const char* path)
transferDir.clear();
}
+/*
void AgentImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
@@ -283,22 +264,7 @@ void AgentImpl::handleRcvMessage(Message& message)
}
}
}
-
-bool AgentImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
+*/
bool AgentImpl::getEvent(AgentEvent& event) const
{
@@ -316,47 +282,16 @@ void AgentImpl::popEvent()
eventQueue.pop_front();
}
-void AgentImpl::newSession()
-{
- Mutex::ScopedLock _lock(lock);
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-}
-
-void AgentImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
- buffer.putShortString("qmfa");
- systemId.encode(buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
-}
-
-void AgentImpl::heartbeat()
+void AgentImpl::setConnection(Connection& conn)
{
Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-
- Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
- buffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
- sendBufferLH(buffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT HeartbeatIndication");
+ if (connection == 0)
+ return;
+ connection = conn;
+ thread = new qpid::sys::Thread(*this);
}
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
+void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& /*argMap*/)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -365,30 +300,11 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
AgentQueryContext::Ptr context = iter->second;
contextMap.erase(iter);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
- buffer.putLong(status);
- buffer.putMediumString(text);
- if (status == 0) {
- for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
- aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->getName())) {
- const Value* val = argMap.byKey(schemaArg->getName());
- val->impl->encode(buffer);
- } else {
- Value val(schemaArg->getType());
- val.impl->encode(buffer);
- }
- }
- }
- }
- sendBufferLH(buffer, context->exchange, context->key);
+ // TODO: Encode method response
QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
+void AgentImpl::queryResponse(uint32_t sequence, Object&)
{
Mutex::ScopedLock _lock(lock);
map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
@@ -396,17 +312,8 @@ void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool
return;
AgentQueryContext::Ptr context = iter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
-
- object.impl->encodeSchemaKey(buffer);
- object.impl->encodeManagedObjectData(buffer);
- if (prop)
- object.impl->encodeProperties(buffer);
- if (stat)
- object.impl->encodeStatistics(buffer);
-
- sendBufferLH(buffer, context->exchange, context->key);
+ // TODO: accumulate data records and send response messages when we have "enough"
+
QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
}
@@ -417,9 +324,11 @@ void AgentImpl::queryComplete(uint32_t sequence)
if (iter == contextMap.end())
return;
+ // TODO: send a response message if there are any unsent data records
+
AgentQueryContext::Ptr context = iter->second;
contextMap.erase(iter);
- sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+ //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
}
void AgentImpl::registerClass(SchemaObjectClass* cls)
@@ -456,413 +365,148 @@ void AgentImpl::registerClass(SchemaEventClass* cls)
// TODO: Indicate this schema if connected.
}
-const ObjectId* AgentImpl::addObject(Object&, uint64_t)
+const char* AgentImpl::addObject(Object&, const char*)
{
Mutex::ScopedLock _lock(lock);
return 0;
}
-const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
+void AgentImpl::raiseEvent(Event&)
{
Mutex::ScopedLock _lock(lock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
-
- ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
- return oid;
}
-const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
+void AgentImpl::run()
{
- return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
-}
-
-void AgentImpl::raiseEvent(Event& event)
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
-
- event.impl->encodeSchemaKey(buffer);
- buffer.putLongLong(uint64_t(Duration(now())));
- event.impl->encode(buffer);
- string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
+ qpid::sys::Duration duration = qpid::sys::TIME_MSEC * 500;
- sendBufferLH(buffer, QMF_EXCHANGE, key);
- QPID_LOG(trace, "SENT EventIndication");
-}
-
-AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
- event->name = name;
+ session = connection.newSession();
+ directReceiver = session.createReceiver(directAddr);
+ directReceiver.setCapacity(10);
- return event;
-}
+ Mutex::ScopedLock _lock(lock);
+ while (running) {
+ Receiver rcvr;
+ bool available;
+ {
+ Mutex::ScopedUnlock _unlock(lock);
+ available = session.nextReceiver(rcvr, duration);
+ }
-AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
- const string& key)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
+ if (available) {
+ Message msg(rcvr.get());
+ handleRcvMessageLH(msg);
+ }
+ }
- return event;
+ directReceiver.close();
+ session.close();
}
-AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
+void AgentImpl::stop()
{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
- return event;
+ Mutex::ScopedLock _lock(lock);
+ running = false;
}
-AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
- const string& cls, boost::shared_ptr<ObjectId> oid)
+AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
event->sequence = num;
event->authUserId = userId;
- if (oid.get())
- event->query.reset(new Query(oid.get()));
- else
- event->query.reset(new Query(cls.c_str(), package.c_str()));
+ event->objectKey = key;
return event;
}
AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- const SchemaObjectClass* objectClass)
+ const string& key, boost::shared_ptr<Variant::Map> argMap,
+ const SchemaObjectClass* objectClass)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
event->sequence = num;
event->authUserId = userId;
event->name = method;
- event->objectId = oid;
+ event->objectKey = key;
event->arguments = argMap;
event->objectClass = objectClass;
return event;
}
-void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = "amq.direct";
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
}
void AgentImpl::sendPackageIndicationLH(const string& packageName)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
- buffer.putShortString(packageName);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ // TODO
QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
}
-void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
+void AgentImpl::sendClassIndicationLH(ClassKind /*kind*/, const string& packageName, const AgentClassKey& key)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
- buffer.putOctet((int) kind);
- buffer.putShortString(packageName);
- buffer.putShortString(key.name);
- buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
+ // TODO
QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
}
-void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
- uint32_t sequence, uint32_t code, const string& text)
+void AgentImpl::sendCommandCompleteLH(const string&, const string&, uint32_t sequence, uint32_t code, const string& text)
{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
- buffer.putLong(code);
- buffer.putShortString(text);
- sendBufferLH(buffer, exchange, replyToKey);
+ // TODO
QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
}
-void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
- buffer.putLong(code);
-
- string fulltext;
- switch (code) {
- case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
- case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
- case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
- case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
- default: fulltext = "Unspecified Error"; break;
- }
-
- if (!text.empty()) {
- fulltext += " (";
- fulltext += text;
- fulltext += ")";
- }
-
- buffer.putMediumString(fulltext);
- sendBufferLH(buffer, DIR_EXCHANGE, key);
- QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
-}
-
-void AgentImpl::handleAttachResponse(Buffer& inBuffer)
+void AgentImpl::sendMethodErrorLH(uint32_t /*sequence*/, const string& /*key*/, uint32_t code, const string& text)
{
- Mutex::ScopedLock _lock(lock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- //storeData(); // TODO
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
-
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
- // Bind to qpid.management to receive commands
- stringstream key;
- key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
- eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
-
- // Send package indications for all local packages
- for (map<string, ClassMaps>::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- sendPackageIndicationLH(pIter->first);
-
- // Send class indications for all local classes
- ClassMaps cMap = pIter->second;
- for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
- cIter != cMap.objectClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
- for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
- cIter != cMap.eventClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
- }
+ // TODO
+ QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << text);
}
-void AgentImpl::handlePackageRequest(Buffer&)
+void AgentImpl::handlePackageRequest(Message&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleClassQuery(Buffer&)
+void AgentImpl::handleClassQuery(Message&)
{
Mutex::ScopedLock _lock(lock);
}
-void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyExchange, const string& replyKey)
+void AgentImpl::handleSchemaRequest(Message&, uint32_t, const string&, const string&)
{
Mutex::ScopedLock _lock(lock);
- string rExchange(replyExchange);
- string rKey(replyKey);
- string packageName;
- inBuffer.getShortString(packageName);
- AgentClassKey key(inBuffer);
-
- if (rExchange.empty())
- rExchange = QMF_EXCHANGE;
- if (rKey.empty())
- rKey = BROKER_KEY;
-
- QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
- map<string, ClassMaps>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
- return;
- }
-
- ClassMaps cMap = pIter->second;
- ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
- if (ocIter != cMap.objectClasses.end()) {
- SchemaObjectClass* oImpl = ocIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- oImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
- return;
- }
-
- EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
- if (ecIter != cMap.eventClasses.end()) {
- SchemaEventClass* eImpl = ecIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- eImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
- return;
- }
-
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
}
-void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleGetQuery(Message&, uint32_t, const string&, const string&)
{
Mutex::ScopedLock _lock(lock);
- FieldTable ft;
- FieldTable::ValuePtr value;
- map<string, ClassMaps>::const_iterator pIter = packages.end();
- string pname;
- string cname;
- string oidRepr;
- boost::shared_ptr<ObjectId> oid;
-
- ft.decode(inBuffer);
-
- QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
-
- value = ft.get("_package");
- if (value.get() && value->convertsTo<string>()) {
- pname = value->get<string>();
- pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
- return;
- }
- }
-
- value = ft.get("_class");
- if (value.get() && value->convertsTo<string>()) {
- cname = value->get<string>();
- // TODO - check for validity of class (in package or any package)
- if (pIter == packages.end()) {
- } else {
-
- }
- }
-
- value = ft.get("_objectid");
- if (value.get() && value->convertsTo<string>()) {
- oidRepr = value->get<string>();
- oid.reset(new ObjectId());
- oid->impl->fromString(oidRepr);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
}
-void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
+void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const string& /*replyTo*/, const string& /*userId*/)
{
Mutex::ScopedLock _lock(lock);
- string pname;
- string method;
- boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
- buffer.getShortString(pname);
- AgentClassKey classKey(buffer);
- buffer.getShortString(method);
-
- QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
-
- map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
- return;
- }
-
- ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
- if (cIter == pIter->second.objectClasses.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
- return;
- }
-
- const SchemaObjectClass* schema = cIter->second;
- vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
- for (; mIter != schema->impl->methods.end(); mIter++) {
- if ((*mIter)->getName() == method)
- break;
- }
-
- if (mIter == schema->impl->methods.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
- return;
- }
-
- const SchemaMethod* schemaMethod = *mIter;
- boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
- Value* value;
- for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
- aIter != schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
- value = ValueImpl::factory(schemaArg->getType(), buffer);
- else
- value = ValueImpl::factory(schemaArg->getType());
- argMap->insert(schemaArg->getName(), value);
- }
+ QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=");
AgentQueryContext::Ptr context(new AgentQueryContext);
uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- context->schemaMethod = schemaMethod;
contextMap[contextNum] = context;
-
- eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
-}
-
-void AgentImpl::handleConsoleAddedIndication()
-{
- Mutex::ScopedLock _lock(lock);
}
//==================================================================
// Wrappers
//==================================================================
-Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
+Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
+void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
-void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void Agent::popXmt() { impl->popXmt(); }
bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
void Agent::popEvent() { impl->popEvent(); }
-void Agent::newSession() { impl->newSession(); }
-void Agent::startProtocol() { impl->startProtocol(); }
-void Agent::heartbeat() { impl->heartbeat(); }
-void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
+void Agent::setConnection(Connection& conn) { impl->setConnection(conn); }
+void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Variant::Map& arguments) { impl->methodResponse(sequence, status, text, arguments); }
+void Agent::queryResponse(uint32_t sequence, Object& object) { impl->queryResponse(sequence, object); }
void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
-const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
-const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
-const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
+const char* Agent::addObject(Object& obj, const char* key) { return impl->addObject(obj, key); }
void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
index b651b52345..031eb698e0 100644
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -23,12 +23,11 @@
#include "qmf/engine/Console.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ValueImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Buffer.h"
#include "qpid/framing/Uuid.h"
+#include "qpid/messaging/Variant.h"
#include "qpid/sys/Mutex.h"
#include "boost/shared_ptr.hpp"
#include "boost/noncopyable.hpp"
@@ -46,8 +45,8 @@ namespace engine {
struct MethodResponseImpl {
uint32_t status;
const SchemaMethod* schema;
- std::auto_ptr<Value> exception;
- std::auto_ptr<Value> arguments;
+ std::auto_ptr<qpid::messaging::Variant> exception;
+ std::auto_ptr<qpid::messaging::Variant::Map> arguments;
MethodResponseImpl(const MethodResponseImpl& from);
MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
@@ -56,14 +55,14 @@ namespace engine {
static MethodResponse* factory(uint32_t status, const std::string& text);
~MethodResponseImpl() {}
uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
+ const qpid::messaging::Variant* getException() const { return exception.get(); }
+ const qpid::messaging::Variant::Map* getArgs() const { return arguments.get(); }
};
typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
struct QueryResponseImpl {
uint32_t status;
- std::auto_ptr<Value> exception;
+ std::auto_ptr<qpid::messaging::Variant> exception;
std::vector<ObjectPtr> results;
QueryResponseImpl() : status(0) {}
@@ -73,7 +72,7 @@ namespace engine {
}
~QueryResponseImpl() {}
uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
+ const qpid::messaging::Variant* getException() const { return exception.get(); }
uint32_t getObjectCount() const { return results.size(); }
const Object* getObject(uint32_t idx) const;
};
@@ -140,8 +139,8 @@ namespace engine {
const AgentProxy* getAgent(uint32_t idx) const;
void sendQuery(const Query& query, void* context, const AgentProxy* agent);
bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
- std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
- void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
+ std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
+ void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
void addBinding(const std::string& exchange, const std::string& key);
void staticRelease() { decOutstanding(); }
@@ -219,6 +218,9 @@ namespace engine {
QueryResponsePtr queryResponse;
};
+ //
+ // MethodContext is used to track and handle the response associated with a single Method Request
+ //
struct MethodContext : public SequenceContext {
MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
virtual ~MethodContext() {}
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
deleted file mode 100644
index 22a65f28ca..0000000000
--- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ConnectionSettingsImpl.h"
-#include "qmf/engine/Typecode.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid;
-
-const string attrProtocol("protocol");
-const string attrHost("host");
-const string attrPort("port");
-const string attrVirtualhost("virtualhost");
-const string attrUsername("username");
-const string attrPassword("password");
-const string attrMechanism("mechanism");
-const string attrLocale("locale");
-const string attrHeartbeat("heartbeat");
-const string attrMaxChannels("maxChannels");
-const string attrMaxFrameSize("maxFrameSize");
-const string attrBounds("bounds");
-const string attrTcpNoDelay("tcpNoDelay");
-const string attrService("service");
-const string attrMinSsf("minSsf");
-const string attrMaxSsf("maxSsf");
-const string attrRetryDelayMin("retryDelayMin");
-const string attrRetryDelayMax("retryDelayMax");
-const string attrRetryDelayFactor("retryDelayFactor");
-const string attrSendUserId("sendUserId");
-
-ConnectionSettingsImpl::ConnectionSettingsImpl() :
- retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
-{
-}
-
-ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) :
- retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
-{
- // TODO: Parse the URL
-}
-
-bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
-{
- if (key == attrProtocol) clientSettings.protocol = value.asString();
- else if (key == attrHost) clientSettings.host = value.asString();
- else if (key == attrPort) clientSettings.port = value.asUint();
- else if (key == attrVirtualhost) clientSettings.virtualhost = value.asString();
- else if (key == attrUsername) clientSettings.username = value.asString();
- else if (key == attrPassword) clientSettings.password = value.asString();
- else if (key == attrMechanism) clientSettings.mechanism = value.asString();
- else if (key == attrLocale) clientSettings.locale = value.asString();
- else if (key == attrHeartbeat) clientSettings.heartbeat = value.asUint();
- else if (key == attrMaxChannels) clientSettings.maxChannels = value.asUint();
- else if (key == attrMaxFrameSize) clientSettings.maxFrameSize = value.asUint();
- else if (key == attrBounds) clientSettings.bounds = value.asUint();
- else if (key == attrTcpNoDelay) clientSettings.tcpNoDelay = value.asBool();
- else if (key == attrService) clientSettings.service = value.asString();
- else if (key == attrMinSsf) clientSettings.minSsf = value.asUint();
- else if (key == attrMaxSsf) clientSettings.maxSsf = value.asUint();
-
- else if (key == attrRetryDelayMin) retryDelayMin = value.asUint();
- else if (key == attrRetryDelayMax) retryDelayMax = value.asUint();
- else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint();
- else if (key == attrSendUserId) sendUserId = value.asBool();
- else
- return false;
- return true;
-}
-
-Value ConnectionSettingsImpl::getAttr(const string& key) const
-{
- Value strval(TYPE_LSTR);
- Value intval(TYPE_UINT32);
- Value boolval(TYPE_BOOL);
-
- if (key == attrProtocol) {
- strval.setString(clientSettings.protocol.c_str());
- return strval;
- }
-
- if (key == attrHost) {
- strval.setString(clientSettings.host.c_str());
- return strval;
- }
-
- if (key == attrPort) {
- intval.setUint(clientSettings.port);
- return intval;
- }
-
- if (key == attrVirtualhost) {
- strval.setString(clientSettings.virtualhost.c_str());
- return strval;
- }
-
- if (key == attrUsername) {
- strval.setString(clientSettings.username.c_str());
- return strval;
- }
-
- if (key == attrPassword) {
- strval.setString(clientSettings.password.c_str());
- return strval;
- }
-
- if (key == attrMechanism) {
- strval.setString(clientSettings.mechanism.c_str());
- return strval;
- }
-
- if (key == attrLocale) {
- strval.setString(clientSettings.locale.c_str());
- return strval;
- }
-
- if (key == attrHeartbeat) {
- intval.setUint(clientSettings.heartbeat);
- return intval;
- }
-
- if (key == attrMaxChannels) {
- intval.setUint(clientSettings.maxChannels);
- return intval;
- }
-
- if (key == attrMaxFrameSize) {
- intval.setUint(clientSettings.maxFrameSize);
- return intval;
- }
-
- if (key == attrBounds) {
- intval.setUint(clientSettings.bounds);
- return intval;
- }
-
- if (key == attrTcpNoDelay) {
- boolval.setBool(clientSettings.tcpNoDelay);
- return boolval;
- }
-
- if (key == attrService) {
- strval.setString(clientSettings.service.c_str());
- return strval;
- }
-
- if (key == attrMinSsf) {
- intval.setUint(clientSettings.minSsf);
- return intval;
- }
-
- if (key == attrMaxSsf) {
- intval.setUint(clientSettings.maxSsf);
- return intval;
- }
-
- if (key == attrRetryDelayMin) {
- intval.setUint(retryDelayMin);
- return intval;
- }
-
- if (key == attrRetryDelayMax) {
- intval.setUint(retryDelayMax);
- return intval;
- }
-
- if (key == attrRetryDelayFactor) {
- intval.setUint(retryDelayFactor);
- return intval;
- }
-
- if (key == attrSendUserId) {
- boolval.setBool(sendUserId);
- return boolval;
- }
-
- return strval;
-}
-
-const string& ConnectionSettingsImpl::getAttrString() const
-{
- // TODO: build and return attribute string
- return attrString;
-}
-
-void ConnectionSettingsImpl::transportTcp(uint16_t port)
-{
- clientSettings.protocol = "tcp";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::transportSsl(uint16_t port)
-{
- clientSettings.protocol = "ssl";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::transportRdma(uint16_t port)
-{
- clientSettings.protocol = "rdma";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::authAnonymous(const string& username)
-{
- clientSettings.mechanism = "ANONYMOUS";
- clientSettings.username = username;
-}
-
-void ConnectionSettingsImpl::authPlain(const string& username, const string& password)
-{
- clientSettings.mechanism = "PLAIN";
- clientSettings.username = username;
- clientSettings.password = password;
-}
-
-void ConnectionSettingsImpl::authGssapi(const string& serviceName, uint32_t minSsf, uint32_t maxSsf)
-{
- clientSettings.mechanism = "GSSAPI";
- clientSettings.service = serviceName;
- clientSettings.minSsf = minSsf;
- clientSettings.maxSsf = maxSsf;
-}
-
-void ConnectionSettingsImpl::setRetry(int delayMin, int delayMax, int delayFactor)
-{
- retryDelayMin = delayMin;
- retryDelayMax = delayMax;
- retryDelayFactor = delayFactor;
-}
-
-const client::ConnectionSettings& ConnectionSettingsImpl::getClientSettings() const
-{
- return clientSettings;
-}
-
-void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) const
-{
- *min = retryDelayMin;
- *max = retryDelayMax;
- *factor = retryDelayFactor;
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); }
-ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); }
-ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); }
-ConnectionSettings::~ConnectionSettings() { delete impl; }
-bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); }
-Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); }
-const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); }
-void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); }
-void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); }
-void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); }
-void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); }
-void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); }
-void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); }
-void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); }
-
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
deleted file mode 100644
index 98bf87868b..0000000000
--- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef _QmfEngineConnectionSettingsImpl_
-#define _QmfEngineConnectionSettingsImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ConnectionSettings.h"
-#include "qmf/engine/Value.h"
-#include "qpid/client/ConnectionSettings.h"
-#include <string>
-#include <map>
-
-namespace qmf {
-namespace engine {
-
- class ConnectionSettingsImpl {
- qpid::client::ConnectionSettings clientSettings;
- mutable std::string attrString;
- int retryDelayMin;
- int retryDelayMax;
- int retryDelayFactor;
- bool sendUserId;
-
- public:
- ConnectionSettingsImpl();
- ConnectionSettingsImpl(const std::string& url);
- ~ConnectionSettingsImpl() {}
- bool setAttr(const std::string& key, const Value& value);
- Value getAttr(const std::string& key) const;
- const std::string& getAttrString() const;
- void transportTcp(uint16_t port);
- void transportSsl(uint16_t port);
- void transportRdma(uint16_t port);
- void authAnonymous(const std::string& username);
- void authPlain(const std::string& username, const std::string& password);
- void authGssapi(const std::string& serviceName, uint32_t minSsf, uint32_t maxSsf);
- void setRetry(int delayMin, int delayMax, int delayFactor);
-
- const qpid::client::ConnectionSettings& getClientSettings() const;
- void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const;
- bool getSendUserId() const { return sendUserId; }
- };
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
index 8f99c5e6b9..5e9783b8a0 100644
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
@@ -23,18 +23,13 @@
#include "qmf/engine/Console.h"
#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
#include "qmf/engine/Protocol.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
deleted file mode 100644
index 0047d3eb9d..0000000000
--- a/qpid/cpp/src/qmf/engine/MessageImpl.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/MessageImpl.h"
-#include <string.h>
-
-using namespace std;
-using namespace qmf::engine;
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-Message MessageImpl::copy()
-{
- Message item;
-
- ::memset(&item, 0, sizeof(Message));
- item.body = const_cast<char*>(body.c_str());
- item.length = body.length();
- STRING_REF(destination);
- STRING_REF(routingKey);
- STRING_REF(replyExchange);
- STRING_REF(replyKey);
- STRING_REF(userId);
-
- return item;
-}
-
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h
deleted file mode 100644
index b91291d2e4..0000000000
--- a/qpid/cpp/src/qmf/engine/MessageImpl.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#ifndef _QmfEngineMessageImpl_
-#define _QmfEngineMessageImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Message.h"
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace qmf {
-namespace engine {
-
- struct MessageImpl {
- typedef boost::shared_ptr<MessageImpl> Ptr;
- std::string body;
- std::string destination;
- std::string routingKey;
- std::string replyExchange;
- std::string replyKey;
- std::string userId;
-
- Message copy();
- };
-}
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
index cae0e0da68..4256b9119e 100644
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
@@ -18,215 +18,50 @@
*/
#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/BrokerProxyImpl.h"
#include <qpid/sys/Time.h>
using namespace std;
using namespace qmf::engine;
using namespace qpid::sys;
-using qpid::framing::Buffer;
+using namespace qpid::messaging;
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
+ObjectImpl::ObjectImpl() :
+ objectClass(0), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
- int propCount = objectClass->getPropertyCount();
- int statCount = objectClass->getStatisticCount();
- int idx;
-
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- }
-
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
- }
-}
-
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
- objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
-{
- int idx;
-
- if (managed) {
- lastUpdatedTime = buffer.getLongLong();
- createTime = buffer.getLongLong();
- destroyTime = buffer.getLongLong();
- objectId.reset(ObjectIdImpl::factory(buffer));
- }
-
- if (prop) {
- int propCount = objectClass->getPropertyCount();
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (excludes.count(prop->getName()) != 0) {
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- } else {
- Value* pval = ValueImpl::factory(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval);
- }
- }
- }
-
- if (stat) {
- int statCount = objectClass->getStatisticCount();
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- Value* sval = ValueImpl::factory(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval);
- }
- }
}
-Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
-{
- ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
- return new Object(impl);
-}
-ObjectImpl::~ObjectImpl()
+ObjectImpl::ObjectImpl(SchemaObjectClass* type) :
+ objectClass(type), createTime(uint64_t(Duration(now()))), destroyTime(0), lastUpdatedTime(createTime)
{
}
-void ObjectImpl::destroy()
-{
- destroyTime = uint64_t(Duration(now()));
- // TODO - flag deletion
-}
-Value* ObjectImpl::getValue(const string& key) const
+void ObjectImpl::touch()
{
- map<string, ValuePtr>::const_iterator iter;
-
- iter = properties.find(key);
- if (iter != properties.end())
- return iter->second.get();
-
- iter = statistics.find(key);
- if (iter != statistics.end())
- return iter->second.get();
-
- return 0;
+ lastUpdatedTime = uint64_t(Duration(now()));
}
-void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
-{
- if (broker != 0 && objectId.get() != 0)
- broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
-}
-void ObjectImpl::merge(const Object& from)
-{
- for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
- piter != from.impl->properties.end(); piter++)
- properties[piter->first] = piter->second;
- for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
- siter != from.impl->statistics.end(); siter++)
- statistics[siter->first] = siter->second;
-}
-
-void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
-{
- int propCount = objectClass->getPropertyCount();
- excludeList.clear();
- uint8_t bit = 0;
- uint8_t mask = 0;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- if (bit == 0) {
- mask = buffer.getOctet();
- bit = 1;
- }
- if ((mask & bit) == 0)
- excludeList.insert(string(prop->getName()));
- if (bit == 0x80)
- bit = 0;
- else
- bit = bit << 1;
- }
- }
-}
-
-void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
-{
- buffer.putShortString(objectClass->getClassKey()->getPackageName());
- buffer.putShortString(objectClass->getClassKey()->getClassName());
- buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
-}
-
-void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
-{
- buffer.putLongLong(lastUpdatedTime);
- buffer.putLongLong(createTime);
- buffer.putLongLong(destroyTime);
- objectId->impl->encode(buffer);
-}
-
-void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
+void ObjectImpl::destroy()
{
- int propCount = objectClass->getPropertyCount();
- uint8_t bit = 0;
- uint8_t mask = 0;
- ValuePtr value;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- value = properties[prop->getName()];
- if (bit == 0)
- bit = 1;
- if (!value->isNull())
- mask |= bit;
- if (bit == 0x80) {
- buffer.putOctet(mask);
- bit = 0;
- mask = 0;
- } else
- bit = bit << 1;
- }
- }
- if (bit != 0) {
- buffer.putOctet(mask);
- }
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- value = properties[prop->getName()];
- if (!prop->isOptional() || !value->isNull()) {
- value->impl->encode(buffer);
- }
- }
+ destroyTime = uint64_t(Duration(now()));
}
-void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
-{
- int statCount = objectClass->getStatisticCount();
- for (int idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValuePtr value = statistics[stat->getName()];
- value->impl->encode(buffer);
- }
-}
//==================================================================
// Wrappers
//==================================================================
-Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(ObjectImpl* i) : impl(i) {}
+Object::Object() : impl(new ObjectImpl()) {}
+Object::Object(SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
Object::~Object() { delete impl; }
+const Variant::Map& Object::getValues() const { return impl->getValues(); }
+Variant::Map& Object::getValues() { return impl->getValues(); }
+const SchemaObjectClass* Object::getSchema() const { return impl->getSchema(); }
+void Object::setSchema(SchemaObjectClass* schema) { impl->setSchema(schema); }
+const char* Object::getKey() const { return impl->getKey(); }
+void Object::setKey(const char* key) { impl->setKey(key); }
+void Object::touch() { impl->touch(); }
void Object::destroy() { impl->destroy(); }
-const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
-void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
-const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
-Value* Object::getValue(const char* key) const { return impl->getValue(key); }
-void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
-bool Object::isDeleted() const { return impl->isDeleted(); }
-void Object::merge(const Object& from) { impl->merge(from); }
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h
index 6f25867004..f1d588271a 100644
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.h
+++ b/qpid/cpp/src/qmf/engine/ObjectImpl.h
@@ -21,53 +21,55 @@
*/
#include <qmf/engine/Object.h>
-#include <qmf/engine/ObjectIdImpl.h>
+#include <qpid/sys/Mutex.h>
+#include <qpid/messaging/Variant.h>
#include <map>
#include <set>
#include <string>
-#include <qpid/framing/Buffer.h>
#include <boost/shared_ptr.hpp>
-#include <qpid/sys/Mutex.h>
namespace qmf {
namespace engine {
- class BrokerProxyImpl;
+ class SchemaObjectClass;
typedef boost::shared_ptr<Object> ObjectPtr;
struct ObjectImpl {
- typedef boost::shared_ptr<Value> ValuePtr;
- const SchemaObjectClass* objectClass;
- BrokerProxyImpl* broker;
- boost::shared_ptr<ObjectId> objectId;
+ /**
+ * Content of the object's data
+ */
+ qpid::messaging::Variant::Map values;
+
+ /**
+ * Schema reference if this object is "described"
+ */
+ SchemaObjectClass* objectClass;
+
+ /**
+ * Address and lifecycle information if this object is "managed"
+ * The object is considered "managed" if the key is non-empty.
+ */
+ std::string key;
uint64_t createTime;
uint64_t destroyTime;
uint64_t lastUpdatedTime;
- mutable std::map<std::string, ValuePtr> properties;
- mutable std::map<std::string, ValuePtr> statistics;
- ObjectImpl(const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- ~ObjectImpl();
+ ObjectImpl();
+ ObjectImpl(SchemaObjectClass* type);
+ ~ObjectImpl() {}
- void destroy();
- const ObjectId* getObjectId() const { return objectId.get(); }
- void setObjectId(ObjectId* oid) { objectId.reset(new ObjectId(*oid)); }
- const SchemaObjectClass* getClass() const { return objectClass; }
- Value* getValue(const std::string& key) const;
- void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
- bool isDeleted() const { return destroyTime != 0; }
- void merge(const Object& from);
+ const qpid::messaging::Variant::Map& getValues() const { return values; }
+ qpid::messaging::Variant::Map& getValues() { return values; }
+
+ const SchemaObjectClass* getSchema() const { return objectClass; }
+ void setSchema(SchemaObjectClass* schema) { objectClass = schema; }
- void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
- void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
- void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
- void encodeProperties(qpid::framing::Buffer& buffer) const;
- void encodeStatistics(qpid::framing::Buffer& buffer) const;
+ const char* getKey() const { return key.c_str(); }
+ void setKey(const char* _key) { key = _key; }
+
+ void touch();
+ void destroy();
};
}
}
diff --git a/qpid/cpp/src/qmf/engine/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp
deleted file mode 100644
index 9e5f490604..0000000000
--- a/qpid/cpp/src/qmf/engine/Protocol.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Protocol.h"
-#include "qpid/framing/Buffer.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-
-
-bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.available() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '2';
-}
-
-void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('2');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
index 6f2beeee87..0df49ff646 100644
--- a/qpid/cpp/src/qmf/engine/QueryImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
@@ -18,52 +18,20 @@
*/
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/FieldTable.h"
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
-bool QueryElementImpl::evaluate(const Object* /*object*/) const
+bool QueryImpl::matches(const Object&) const
{
- // TODO: Implement this
- return false;
+ return true;
}
-bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
-{
- // TODO: Implement this
- return false;
-}
-
-QueryImpl::QueryImpl(Buffer& buffer)
-{
- FieldTable ft;
- ft.decode(buffer);
- // TODO
-}
-
-Query* QueryImpl::factory(Buffer& buffer)
-{
- QueryImpl* impl(new QueryImpl(buffer));
- return new Query(impl);
-}
-void QueryImpl::encode(Buffer& buffer) const
+void QueryImpl::parsePredicate(const std::string&)
{
- FieldTable ft;
-
- if (oid.get() != 0) {
- ft.setString("_objectid", oid->impl->asString());
- } else {
- if (!packageName.empty())
- ft.setString("_package", packageName);
- ft.setString("_class", className);
- }
-
- ft.encode(buffer);
+ predicate.clear();
}
@@ -71,33 +39,21 @@ void QueryImpl::encode(Buffer& buffer) const
// Wrappers
//==================================================================
-QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
-QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
-QueryElement::~QueryElement() { delete impl; }
-bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
-QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
-QueryExpression::~QueryExpression() { delete impl; }
-bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
-Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
-Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
+Query::Query(const char* target) : impl(new QueryImpl(target)) {}
+Query::Query(const char* target, const Variant::List& predicate) : impl(new QueryImpl(target, predicate)) {}
+Query::Query(const char* target, const char* expression) : impl(new QueryImpl(target, expression)) {}
Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
Query::~Query() { delete impl; }
-void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
-void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
-void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
-const char* Query::getPackage() const { return impl->getPackage().c_str(); }
-const char* Query::getClass() const { return impl->getClass().c_str(); }
-const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
-bool Query::haveSelect() const { return impl->haveSelect(); }
+void Query::where(const Variant::List& predicate) { impl->where(predicate); }
+void Query::where(const char* expression) { impl->where(expression); }
+void Query::limit(uint32_t maxResults) { impl->limit(maxResults); }
+void Query::orderBy(const char* attrName, bool decreasing) { impl->orderBy(attrName, decreasing); }
+bool Query::havePredicate() const { return impl->havePredicate(); }
bool Query::haveLimit() const { return impl->haveLimit(); }
bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
-const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
+const Variant::List& Query::getPredicate() const { return impl->getPredicate(); }
uint32_t Query::getLimit() const { return impl->getLimit(); }
-const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
+const char* Query::getOrderBy() const { return impl->getOrderBy(); }
bool Query::getDecreasing() const { return impl->getDecreasing(); }
+bool Query::matches(const Object& object) const { return impl->matches(object); }
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h
index 8ebe0d932f..0ef8711f8e 100644
--- a/qpid/cpp/src/qmf/engine/QueryImpl.h
+++ b/qpid/cpp/src/qmf/engine/QueryImpl.h
@@ -21,79 +21,41 @@
*/
#include "qmf/engine/Query.h"
-#include "qmf/engine/Schema.h"
+#include <qpid/messaging/Variant.h>
#include <string>
#include <boost/shared_ptr.hpp>
-namespace qpid {
- namespace framing {
- class Buffer;
- }
-}
-
namespace qmf {
namespace engine {
- struct QueryElementImpl {
- QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {}
- ~QueryElementImpl() {}
- bool evaluate(const Object* object) const;
-
- std::string attrName;
- const Value* value;
- ValueOper oper;
- };
-
- struct QueryExpressionImpl {
- QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {}
- ~QueryExpressionImpl() {}
- bool evaluate(const Object* object) const;
-
- ExprOper oper;
- const QueryOperand* left;
- const QueryOperand* right;
- };
-
struct QueryImpl {
- // Constructors mapped to public
- QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {}
- QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {}
- QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {}
-
- // Factory constructors
- QueryImpl(qpid::framing::Buffer& buffer);
-
- ~QueryImpl() {};
- static Query* factory(qpid::framing::Buffer& buffer);
-
- void setSelect(const QueryOperand* criterion) { select = criterion; }
- void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
- void setOrderBy(const std::string& attrName, bool decreasing) {
- orderBy = attrName; orderDecreasing = decreasing;
- }
-
- const std::string& getPackage() const { return packageName; }
- const std::string& getClass() const { return className; }
- const ObjectId* getObjectId() const { return oid.get(); }
-
- bool haveSelect() const { return select != 0; }
- bool haveLimit() const { return resultLimit > 0; }
- bool haveOrderBy() const { return !orderBy.empty(); }
- const QueryOperand* getSelect() const { return select; }
+ QueryImpl(const char* _target) : target(_target), resultLimit(0) {}
+ QueryImpl(const char* _target, const qpid::messaging::Variant::List& _predicate) :
+ target(_target), predicate(_predicate), resultLimit(0) {}
+ QueryImpl(const char* _target, const char* expression) :
+ target(_target), resultLimit(0) { parsePredicate(expression); }
+ ~QueryImpl() {}
+
+ void where(const qpid::messaging::Variant::List& _predicate) { predicate = _predicate; }
+ void where(const char* expression) { parsePredicate(expression); }
+ void limit(uint32_t maxResults) { resultLimit = maxResults; }
+ void orderBy(const char* attrName, bool decreasing) { sortAttr = attrName; orderDecreasing = decreasing; }
+
+ bool havePredicate() const { return !predicate.empty(); }
+ bool haveLimit() const { return resultLimit != 0; }
+ bool haveOrderBy() const { return !sortAttr.empty(); }
+ const qpid::messaging::Variant::List& getPredicate() const { return predicate; }
uint32_t getLimit() const { return resultLimit; }
- const std::string& getOrderBy() const { return orderBy; }
+ const char* getOrderBy() const { return sortAttr.c_str(); }
bool getDecreasing() const { return orderDecreasing; }
+ bool matches(const Object& object) const;
- void encode(qpid::framing::Buffer& buffer) const;
- bool singleAgent() const { return oid.get() != 0; }
- uint32_t agentBank() const { return singleAgent() ? oid->getAgentBank() : 0; }
+ void parsePredicate(const std::string& expression);
- std::string packageName;
- std::string className;
- boost::shared_ptr<ObjectId> oid;
- const QueryOperand* select;
+ const std::string target;
+ qpid::messaging::Variant::List predicate;
uint32_t resultLimit;
- std::string orderBy;
+ std::string sortAttr;
bool orderDecreasing;
};
}
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
deleted file mode 100644
index ab65b8d768..0000000000
--- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
+++ /dev/null
@@ -1,514 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ResilientConnection.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/ConnectionSettingsImpl.h"
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/Message.h>
-#include <qpid/sys/Thread.h>
-#include <qpid/sys/Runnable.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/sys/Condition.h>
-#include <qpid/sys/Time.h>
-#include <qpid/log/Statement.h>
-#include <qpid/RefCounted.h>
-#include <boost/bind.hpp>
-#include <string>
-#include <deque>
-#include <vector>
-#include <set>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/noncopyable.hpp>
-#include <unistd.h>
-#include <fcntl.h>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid;
-using qpid::sys::Mutex;
-
-namespace qmf {
-namespace engine {
- struct ResilientConnectionEventImpl {
- ResilientConnectionEvent::EventKind kind;
- void* sessionContext;
- string errorText;
- MessageImpl message;
-
- ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
- const MessageImpl& m = MessageImpl()) :
- kind(k), sessionContext(0), message(m) {}
- ResilientConnectionEvent copy();
- };
-
- struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
- typedef boost::intrusive_ptr<RCSession> Ptr;
- ResilientConnectionImpl& connImpl;
- string name;
- client::Connection& connection;
- client::Session session;
- client::SubscriptionManager* subscriptions;
- string userId;
- void* userContext;
- vector<string> dests;
- qpid::sys::Thread thread;
-
- RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
- ~RCSession();
- void received(client::Message& msg);
- void run();
- void stop();
- };
-
- class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
- public:
- ResilientConnectionImpl(const ConnectionSettings& settings);
- ~ResilientConnectionImpl();
-
- bool isConnected() const;
- bool getEvent(ResilientConnectionEvent& event);
- void popEvent();
- bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
- void destroySession(SessionHandle handle);
- void sendMessage(SessionHandle handle, qmf::engine::Message& message);
- void declareQueue(SessionHandle handle, char* queue);
- void deleteQueue(SessionHandle handle, char* queue);
- void bind(SessionHandle handle, char* exchange, char* queue, char* key);
- void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
- void setNotifyFd(int fd);
- void notify();
-
- void run();
- void failure();
- void sessionClosed(RCSession* sess);
-
- void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext = 0,
- const MessageImpl& message = MessageImpl(),
- const string& errorText = "");
-
- private:
- int notifyFd;
- bool connected;
- bool shutdown;
- string lastError;
- const ConnectionSettings settings;
- client::Connection connection;
- mutable qpid::sys::Mutex lock;
- int delayMin;
- int delayMax;
- int delayFactor;
- qpid::sys::Condition cond;
- deque<ResilientConnectionEventImpl> eventQueue;
- set<RCSession::Ptr> sessions;
- qpid::sys::Thread connThread;
- };
-}
-}
-
-ResilientConnectionEvent ResilientConnectionEventImpl::copy()
-{
- ResilientConnectionEvent item;
-
- ::memset(&item, 0, sizeof(ResilientConnectionEvent));
- item.kind = kind;
- item.sessionContext = sessionContext;
- item.message = message.copy();
- item.errorText = const_cast<char*>(errorText.c_str());
-
- return item;
-}
-
-RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
- connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
- subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
-{
- const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
- userId = operSettings.username;
-}
-
-RCSession::~RCSession()
-{
- subscriptions->stop();
- thread.join();
- session.close();
- delete subscriptions;
-}
-
-void RCSession::run()
-{
- try {
- subscriptions->run();
- } catch (exception& /*e*/) {
- connImpl.sessionClosed(this);
- }
-}
-
-void RCSession::stop()
-{
- subscriptions->stop();
-}
-
-void RCSession::received(client::Message& msg)
-{
- MessageImpl qmsg;
- qmsg.body = msg.getData();
-
- qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
- if (dp.hasRoutingKey()) {
- qmsg.routingKey = dp.getRoutingKey();
- }
-
- qpid::framing::MessageProperties mp = msg.getMessageProperties();
- if (mp.hasReplyTo()) {
- const qpid::framing::ReplyTo& rt = mp.getReplyTo();
- qmsg.replyExchange = rt.getExchange();
- qmsg.replyKey = rt.getRoutingKey();
- }
-
- if (mp.hasUserId()) {
- qmsg.userId = mp.getUserId();
- }
-
- connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
-}
-
-ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
- notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
-{
- connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
- settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
-}
-
-ResilientConnectionImpl::~ResilientConnectionImpl()
-{
- shutdown = true;
- connected = false;
- cond.notify();
- connThread.join();
- connection.close();
-}
-
-bool ResilientConnectionImpl::isConnected() const
-{
- Mutex::ScopedLock _lock(lock);
- return connected;
-}
-
-bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front().copy();
- return true;
-}
-
-void ResilientConnectionImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
- SessionHandle& handle)
-{
- Mutex::ScopedLock _lock(lock);
- if (!connected)
- return false;
-
- RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
-
- handle.impl = (void*) sess.get();
- sessions.insert(sess);
-
- return true;
-}
-
-void ResilientConnectionImpl::destroySession(SessionHandle handle)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- if (iter != sessions.end()) {
- for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
- sess->subscriptions->cancel(dIter->c_str());
- sess->subscriptions->stop();
- sess->subscriptions->wait();
-
- sessions.erase(iter);
- return;
- }
-}
-
-void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- qpid::client::Message msg;
- string data(message.body, message.length);
- msg.getDeliveryProperties().setRoutingKey(message.routingKey);
- msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
- if (settings.impl->getSendUserId())
- msg.getMessageProperties().setUserId(sess->userId);
- msg.setData(data);
-
- try {
- sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
- } catch(exception& e) {
- QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
- }
-}
-
-void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
- sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
- sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
- sess->subscriptions->subscribe(*sess, queue, queue);
- sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
- sess->dests.push_back(string(queue));
-}
-
-void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDelete(client::arg::queue=queue);
- for (vector<string>::iterator iter = sess->dests.begin();
- iter != sess->dests.end(); iter++)
- if (*iter == queue) {
- sess->subscriptions->cancel(queue);
- sess->dests.erase(iter);
- break;
- }
-}
-
-void ResilientConnectionImpl::bind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::unbind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::notify()
-{
- if (notifyFd != -1)
- {
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
- }
-}
-
-
-void ResilientConnectionImpl::setNotifyFd(int fd)
-{
- notifyFd = fd;
- if (notifyFd > 0) {
- int original = fcntl(notifyFd, F_GETFL);
- fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
- }
-}
-
-void ResilientConnectionImpl::run()
-{
- int delay(delayMin);
-
- while (true) {
- try {
- QPID_LOG(trace, "Trying to open connection...");
- connection.open(settings.impl->getClientSettings());
- {
- Mutex::ScopedLock _lock(lock);
- connected = true;
- EnqueueEvent(ResilientConnectionEvent::CONNECTED);
-
- while (connected)
- cond.wait(lock);
- delay = delayMin;
-
- while (!sessions.empty()) {
- set<RCSession::Ptr>::iterator iter = sessions.begin();
- RCSession::Ptr sess = *iter;
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
- Mutex::ScopedUnlock _u(lock);
- sess->stop();
-
- // Nullify the intrusive pointer within the scoped unlock, otherwise,
- // the reference is held until overwritted above (under lock) which causes
- // the session destructor to be called with the lock held.
- sess = 0;
- }
-
- EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
-
- if (shutdown)
- return;
- }
- connection.close();
- } catch (exception &e) {
- QPID_LOG(debug, "connection.open exception: " << e.what());
- Mutex::ScopedLock _lock(lock);
- lastError = e.what();
- if (delay < delayMax)
- delay *= delayFactor;
- }
-
- ::qpid::sys::sleep(delay);
- }
-}
-
-void ResilientConnectionImpl::failure()
-{
- Mutex::ScopedLock _lock(lock);
-
- connected = false;
- lastError = "Closed by Peer";
- cond.notify();
-}
-
-void ResilientConnectionImpl::sessionClosed(RCSession*)
-{
- Mutex::ScopedLock _lock(lock);
- connected = false;
- lastError = "Closed due to Session failure";
- cond.notify();
-}
-
-void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext,
- const MessageImpl& message,
- const string& errorText)
-{
- {
- Mutex::ScopedLock _lock(lock);
- ResilientConnectionEventImpl event(kind, message);
-
- event.sessionContext = sessionContext;
- event.errorText = errorText;
-
- eventQueue.push_back(event);
- }
-
- if (notifyFd != -1)
- {
- int unused_ret; //Suppress warnings about ignoring return value.
- unused_ret = ::write(notifyFd, ".", 1);
- }
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
-{
- impl = new ResilientConnectionImpl(settings);
-}
-
-ResilientConnection::~ResilientConnection()
-{
- delete impl;
-}
-
-bool ResilientConnection::isConnected() const
-{
- return impl->isConnected();
-}
-
-bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
-{
- return impl->getEvent(event);
-}
-
-void ResilientConnection::popEvent()
-{
- impl->popEvent();
-}
-
-bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
-{
- return impl->createSession(name, sessionContext, handle);
-}
-
-void ResilientConnection::destroySession(SessionHandle handle)
-{
- impl->destroySession(handle);
-}
-
-void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- impl->sendMessage(handle, message);
-}
-
-void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
-{
- impl->declareQueue(handle, queue);
-}
-
-void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
-{
- impl->deleteQueue(handle, queue);
-}
-
-void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->bind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->unbind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::setNotifyFd(int fd)
-{
- impl->setNotifyFd(fd);
-}
-
-void ResilientConnection::notify()
-{
- impl->notify();
-}
-
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
index 249a08ba7f..dcfc0db8e5 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -18,18 +18,14 @@
*/
#include "qmf/engine/SchemaImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/Uuid.h>
+#include "qmf/Protocol.h"
#include <string.h>
#include <string>
#include <vector>
using namespace std;
using namespace qmf::engine;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::Uuid;
+using namespace qpid::messaging;
SchemaHash::SchemaHash()
{
@@ -37,16 +33,6 @@ SchemaHash::SchemaHash()
hash[idx] = 0x5A;
}
-void SchemaHash::encode(Buffer& buffer) const
-{
- buffer.putBin128(hash);
-}
-
-void SchemaHash::decode(Buffer& buffer)
-{
- buffer.getBin128(hash);
-}
-
void SchemaHash::update(uint8_t data)
{
update((char*) &data, 1);
@@ -81,48 +67,68 @@ bool SchemaHash::operator>(const SchemaHash& other) const
return ::memcmp(&hash, &other.hash, 16) > 0;
}
-SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer)
+
+SchemaArgumentImpl::SchemaArgumentImpl(const Variant::Map& map)
{
- FieldTable map;
- map.decode(buffer);
+ Variant::Map::const_iterator iter;
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
+ iter = map.find(Protocol::SCHEMA_ELT_NAME);
+ if (iter == map.end())
+ throw SchemaException("SchemaArgument", Protocol::SCHEMA_ELT_NAME);
+ name = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_ELT_TYPE);
+ if (iter == map.end())
+ throw SchemaException("SchemaArgument", Protocol::SCHEMA_ELT_TYPE);
+ typecode = (Typecode) iter->second.asUint8();
+
+ iter = map.find(Protocol::SCHEMA_ELT_UNIT);
+ if (iter != map.end())
+ unit = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_ELT_DESC);
+ if (iter != map.end())
+ description = iter->second.asString();
dir = DIR_IN;
- string dstr(map.getAsString("dir"));
- if (dstr == "O")
- dir = DIR_OUT;
- else if (dstr == "IO")
- dir = DIR_IN_OUT;
+ iter = map.find(Protocol::SCHEMA_ELT_DIR);
+ if (iter != map.end()) {
+ string dstr(iter->second.asString());
+ if (dstr == "O")
+ dir = DIR_OUT;
+ else if (dstr == "IO")
+ dir = DIR_IN_OUT;
+ }
}
-SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer)
+SchemaArgument* SchemaArgumentImpl::factory(Variant::Map& map)
{
- SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer));
+ SchemaArgumentImpl* impl(new SchemaArgumentImpl(map));
return new SchemaArgument(impl);
}
-void SchemaArgumentImpl::encode(Buffer& buffer) const
+Variant::Map SchemaArgumentImpl::asMap() const
{
- FieldTable map;
+ Variant::Map map;
+
+ map[Protocol::SCHEMA_ELT_NAME] = Variant(name);
+ map[Protocol::SCHEMA_ELT_TYPE] = Variant((uint8_t) typecode);
- map.setString("name", name);
- map.setInt("type", (int) typecode);
+ string dirStr;
if (dir == DIR_IN)
- map.setString("dir", "I");
+ dirStr = "I";
else if (dir == DIR_OUT)
- map.setString("dir", "O");
+ dirStr = "O";
else
- map.setString("dir", "IO");
+ dirStr = "IO";
+ map[Protocol::SCHEMA_ELT_DIR] = Variant(dirStr);
+
if (!unit.empty())
- map.setString("unit", unit);
+ map[Protocol::SCHEMA_ELT_UNIT] = Variant(unit);
if (!description.empty())
- map.setString("desc", description);
+ map[Protocol::SCHEMA_ELT_DESC] = Variant(description);
- map.encode(buffer);
+ return map;
}
void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
@@ -134,41 +140,51 @@ void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer)
+SchemaMethodImpl::SchemaMethodImpl(const Variant::Map& map)
{
- FieldTable map;
- int argCount;
+ Variant::Map::const_iterator iter;
- map.decode(buffer);
- name = map.getAsString("name");
- argCount = map.getAsInt("argCount");
- description = map.getAsString("desc");
+ iter = map.find(Protocol::SCHEMA_ELT_NAME);
+ if (iter == map.end())
+ throw SchemaException("SchemaMethod", Protocol::SCHEMA_ELT_NAME);
+ name = iter->second.asString();
- for (int idx = 0; idx < argCount; idx++) {
- SchemaArgument* arg = SchemaArgumentImpl::factory(buffer);
- addArgument(arg);
+ iter = map.find(Protocol::SCHEMA_ELT_DESC);
+ if (iter != map.end())
+ description = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_ARGS);
+ if (iter != map.end()) {
+ Variant::List list(iter->second.asList());
+ for (Variant::List::const_iterator aiter = list.begin(); aiter != list.end(); aiter++) {
+ Variant::Map argMap(aiter->asMap());
+ SchemaArgument* arg = SchemaArgumentImpl::factory(argMap);
+ addArgument(arg);
+ }
}
}
-SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer)
+SchemaMethod* SchemaMethodImpl::factory(Variant::Map& map)
{
- SchemaMethodImpl* impl(new SchemaMethodImpl(buffer));
+ SchemaMethodImpl* impl(new SchemaMethodImpl(map));
return new SchemaMethod(impl);
}
-void SchemaMethodImpl::encode(Buffer& buffer) const
+Variant::Map SchemaMethodImpl::asMap() const
{
- FieldTable map;
+ Variant::Map map;
- map.setString("name", name);
- map.setInt("argCount", arguments.size());
+ map[Protocol::SCHEMA_ELT_NAME] = Variant(name);
if (!description.empty())
- map.setString("desc", description);
- map.encode(buffer);
+ map[Protocol::SCHEMA_ELT_DESC] = Variant(description);
+ Variant::List list;
for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
iter != arguments.end(); iter++)
- (*iter)->impl->encode(buffer);
+ list.push_back((*iter)->impl->asMap());
+ map[Protocol::SCHEMA_ARGS] = list;
+
+ return map;
}
void SchemaMethodImpl::addArgument(const SchemaArgument* argument)
@@ -195,41 +211,58 @@ void SchemaMethodImpl::updateHash(SchemaHash& hash) const
(*iter)->impl->updateHash(hash);
}
-SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer)
+SchemaPropertyImpl::SchemaPropertyImpl(const Variant::Map& map)
{
- FieldTable map;
- map.decode(buffer);
+ Variant::Map::const_iterator iter;
+
+ iter = map.find(Protocol::SCHEMA_ELT_NAME);
+ if (iter == map.end())
+ throw SchemaException("SchemaProperty", Protocol::SCHEMA_ELT_NAME);
+ name = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_ELT_TYPE);
+ if (iter == map.end())
+ throw SchemaException("SchemaProperty", Protocol::SCHEMA_ELT_TYPE);
+ typecode = (Typecode) iter->second.asUint8();
+
+ iter = map.find(Protocol::SCHEMA_ELT_ACCESS);
+ if (iter != map.end())
+ access = (Access) iter->second.asUint8();
+
+ iter = map.find(Protocol::SCHEMA_ELT_UNIT);
+ if (iter != map.end())
+ unit = iter->second.asString();
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- access = (Access) map.getAsInt("access");
- index = map.getAsInt("index") != 0;
- optional = map.getAsInt("optional") != 0;
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
+ iter = map.find(Protocol::SCHEMA_ELT_DESC);
+ if (iter != map.end())
+ description = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_ELT_OPTIONAL);
+ if (iter != map.end())
+ optional = true;
}
-SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer)
+SchemaProperty* SchemaPropertyImpl::factory(Variant::Map& map)
{
- SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer));
+ SchemaPropertyImpl* impl(new SchemaPropertyImpl(map));
return new SchemaProperty(impl);
}
-void SchemaPropertyImpl::encode(Buffer& buffer) const
+Variant::Map SchemaPropertyImpl::asMap() const
{
- FieldTable map;
+ Variant::Map map;
- map.setString("name", name);
- map.setInt("type", (int) typecode);
- map.setInt("access", (int) access);
- map.setInt("index", index ? 1 : 0);
- map.setInt("optional", optional ? 1 : 0);
+ map[Protocol::SCHEMA_ELT_NAME] = Variant(name);
+ map[Protocol::SCHEMA_ELT_TYPE] = Variant((uint8_t) typecode);
+ map[Protocol::SCHEMA_ELT_ACCESS] = Variant((uint8_t) access);
+ if (optional)
+ map[Protocol::SCHEMA_ELT_OPTIONAL] = Variant();
if (!unit.empty())
- map.setString("unit", unit);
+ map[Protocol::SCHEMA_ELT_UNIT] = Variant(unit);
if (!description.empty())
- map.setString("desc", description);
+ map[Protocol::SCHEMA_ELT_DESC] = Variant(description);
- map.encode(buffer);
+ return map;
}
void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
@@ -243,52 +276,28 @@ void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
hash.update(description);
}
-SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer)
-{
- FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
-}
-
-SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer)
-{
- SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer));
- return new SchemaStatistic(impl);
-}
+SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) :
+ package(p), name(n), hash(h) {}
-void SchemaStatisticImpl::encode(Buffer& buffer) const
+SchemaClassKeyImpl::SchemaClassKeyImpl(const Variant::Map& map) :
+ package(packageContainer), name(nameContainer), hash(hashContainer)
{
- FieldTable map;
+ Variant::Map::const_iterator iter;
- map.setString("name", name);
- map.setInt("type", (int) typecode);
- if (!unit.empty())
- map.setString("unit", unit);
- if (!description.empty())
- map.setString("desc", description);
+ iter = map.find(Protocol::SCHEMA_PACKAGE);
+ if (iter == map.end())
+ throw SchemaException("SchemaClassKey", Protocol::SCHEMA_PACKAGE);
+ packageContainer = iter->second.asString();
- map.encode(buffer);
-}
+ iter = map.find(Protocol::SCHEMA_CLASS);
+ if (iter == map.end())
+ throw SchemaException("SchemaClassKey", Protocol::SCHEMA_CLASS);
+ nameContainer = iter->second.asString();
-void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
-{
- hash.update(name);
- hash.update(typecode);
- hash.update(unit);
- hash.update(description);
-}
-
-SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {}
-
-SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer)
-{
- buffer.getShortString(packageContainer);
- buffer.getShortString(nameContainer);
- hashContainer.decode(buffer);
+ iter = map.find(Protocol::SCHEMA_HASH);
+ if (iter == map.end())
+ throw SchemaException("SchemaClassKey", Protocol::SCHEMA_HASH);
+ hashContainer.set(iter->second.asUuid().data());
}
SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash)
@@ -297,17 +306,21 @@ SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string&
return new SchemaClassKey(impl);
}
-SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer)
+SchemaClassKey* SchemaClassKeyImpl::factory(Variant::Map& map)
{
- SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer));
+ SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(map));
return new SchemaClassKey(impl);
}
-void SchemaClassKeyImpl::encode(Buffer& buffer) const
+Variant::Map SchemaClassKeyImpl::asMap() const
{
- buffer.putShortString(package);
- buffer.putShortString(name);
- hash.encode(buffer);
+ Variant::Map map;
+
+ map[Protocol::SCHEMA_PACKAGE] = Variant(package);
+ map[Protocol::SCHEMA_CLASS] = Variant(name);
+ map[Protocol::SCHEMA_HASH] = Variant(); // TODO: use UUID type when available
+
+ return map;
}
bool SchemaClassKeyImpl::operator==(const SchemaClassKeyImpl& other) const
@@ -335,10 +348,21 @@ const string& SchemaClassKeyImpl::str() const
return repr;
}
-SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
+SchemaObjectClassImpl::SchemaObjectClassImpl(const Variant::Map& map) :
+ hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
{
- buffer.getShortString(package);
- buffer.getShortString(name);
+ Variant::Map::const_iterator iter;
+
+ iter = map.find(Protocol::SCHEMA_PACKAGE);
+ if (iter == map.end())
+ throw SchemaException("SchemaObjectClass", Protocol::SCHEMA_PACKAGE);
+ package = iter->second.asString();
+
+ iter = map.find(Protocol::SCHEMA_CLASS);
+ if (iter == map.end())
+ throw SchemaException("SchemaObjectClass", Protocol::SCHEMA_CLASS);
+ name = iter->second.asString();
+
hash.decode(buffer);
uint16_t propCount = buffer.getShort();
@@ -350,24 +374,19 @@ SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), cl
addProperty(property);
}
- for (uint16_t idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer);
- addStatistic(statistic);
- }
-
for (uint16_t idx = 0; idx < methodCount; idx++) {
SchemaMethod* method = SchemaMethodImpl::factory(buffer);
addMethod(method);
}
}
-SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer)
+SchemaObjectClass* SchemaObjectClassImpl::factory(Variant::Map& map)
{
SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer));
return new SchemaObjectClass(impl);
}
-void SchemaObjectClassImpl::encode(Buffer& buffer) const
+void SchemaObjectClassImpl::encode(Variant::Map& map) const
{
buffer.putOctet((uint8_t) CLASS_OBJECT);
buffer.putShortString(package);
@@ -454,11 +473,12 @@ const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
return 0;
}
-SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
+SchemaEventClassImpl::SchemaEventClassImpl(Variant::Map& map) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
{
buffer.getShortString(package);
buffer.getShortString(name);
hash.decode(buffer);
+ buffer.putOctet(0); // No parent class
uint16_t argCount = buffer.getShort();
@@ -468,13 +488,13 @@ SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), clas
}
}
-SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer)
+SchemaEventClass* SchemaEventClassImpl::factory(Variant::Map& map)
{
SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer));
return new SchemaEventClass(impl);
}
-void SchemaEventClassImpl::encode(Buffer& buffer) const
+void SchemaEventClassImpl::encode(Variant::Map& map) const
{
buffer.putOctet((uint8_t) CLASS_EVENT);
buffer.putShortString(package);
@@ -561,17 +581,6 @@ bool SchemaProperty::isOptional() const { return impl->isOptional(); }
const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); }
const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); }
-SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {}
-SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {}
-SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {}
-SchemaStatistic::~SchemaStatistic() { delete impl; }
-void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); }
-void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); }
-const char* SchemaStatistic::getName() const { return impl->getName().c_str(); }
-Typecode SchemaStatistic::getType() const { return impl->getType(); }
-const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); }
-const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); }
-
SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {}
SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {}
SchemaClassKey::~SchemaClassKey() { delete impl; }
@@ -587,24 +596,20 @@ SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {}
SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {}
SchemaObjectClass::~SchemaObjectClass() { delete impl; }
void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); }
-void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); }
void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); }
const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); }
int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); }
-int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); }
int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); }
const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); }
-const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); }
const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); }
-SchemaEventClass::SchemaEventClass(const char* package, const char* name, Severity s) : impl(new SchemaEventClassImpl(package, name, s)) {}
+SchemaEventClass::SchemaEventClass(const char* package, const char* name) : impl(new SchemaEventClassImpl(package, name)) {}
SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {}
SchemaEventClass::~SchemaEventClass() { delete impl; }
void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); }
const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); }
-Severity SchemaEventClass::getSeverity() const { return impl->getSeverity(); }
int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); }
const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); }
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
index 7be757ee8d..a26bc07f6d 100644
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ b/qpid/cpp/src/qmf/engine/SchemaImpl.h
@@ -21,9 +21,10 @@
*/
#include "qmf/engine/Schema.h"
+#include <string.h>
#include <string>
#include <vector>
-#include <qpid/framing/Buffer.h>
+#include <exception>
namespace qmf {
namespace engine {
@@ -32,20 +33,33 @@ namespace engine {
// TODO: Add "frozen" attribute for schema classes so they can't be modified after
// they've been registered.
+ typedef qpid::messaging::VariantType Typecode;
+
+ class SchemaException : public std::exception {
+ public:
+ SchemaException(const std::string& context, const std::string& expected) {
+ text = context + ": Expected item with key " + expected;
+ }
+ virtual ~SchemaException() throw();
+ virtual const char* what() const throw() { return text.c_str(); }
+
+ private:
+ std::string text;
+ };
+
class SchemaHash {
uint8_t hash[16];
public:
SchemaHash();
- void encode(qpid::framing::Buffer& buffer) const;
- void decode(qpid::framing::Buffer& buffer);
void update(const char* data, uint32_t len);
void update(uint8_t data);
void update(const std::string& data) { update(data.c_str(), data.size()); }
- void update(Typecode t) { update((uint8_t) t); }
void update(Direction d) { update((uint8_t) d); }
void update(Access a) { update((uint8_t) a); }
+ void update(Typecode a) { update((uint8_t) a); }
void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
const uint8_t* get() const { return hash; }
+ void set(const uint8_t* val) { ::memcpy(hash, val, 16); }
bool operator==(const SchemaHash& other) const;
bool operator<(const SchemaHash& other) const;
bool operator>(const SchemaHash& other) const;
@@ -59,9 +73,9 @@ namespace engine {
std::string description;
SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {}
- SchemaArgumentImpl(qpid::framing::Buffer& buffer);
- static SchemaArgument* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
+ SchemaArgumentImpl(const qpid::messaging::Variant::Map& map);
+ static SchemaArgument* factory(qpid::messaging::Variant::Map& map);
+ qpid::messaging::Variant::Map asMap() const;
void setDirection(Direction d) { dir = d; }
void setUnit(const char* val) { unit = val; }
void setDesc(const char* desc) { description = desc; }
@@ -79,9 +93,9 @@ namespace engine {
std::vector<const SchemaArgument*> arguments;
SchemaMethodImpl(const char* n) : name(n) {}
- SchemaMethodImpl(qpid::framing::Buffer& buffer);
- static SchemaMethod* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
+ SchemaMethodImpl(const qpid::messaging::Variant::Map& map);
+ static SchemaMethod* factory(qpid::messaging::Variant::Map& map);
+ qpid::messaging::Variant::Map asMap() const;
void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc) { description = desc; }
const std::string& getName() const { return name; }
@@ -101,9 +115,9 @@ namespace engine {
std::string description;
SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {}
- SchemaPropertyImpl(qpid::framing::Buffer& buffer);
- static SchemaProperty* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
+ SchemaPropertyImpl(const qpid::messaging::Variant::Map& map);
+ static SchemaProperty* factory(qpid::messaging::Variant::Map& map);
+ qpid::messaging::Variant::Map asMap() const;
void setAccess(Access a) { access = a; }
void setIndex(bool val) { index = val; }
void setOptional(bool val) { optional = val; }
@@ -119,25 +133,6 @@ namespace engine {
void updateHash(SchemaHash& hash) const;
};
- struct SchemaStatisticImpl {
- std::string name;
- Typecode typecode;
- std::string unit;
- std::string description;
-
- SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {}
- SchemaStatisticImpl(qpid::framing::Buffer& buffer);
- static SchemaStatistic* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void setUnit(const char* val) { unit = val; }
- void setDesc(const char* desc) { description = desc; }
- const std::string& getName() const { return name; }
- Typecode getType() const { return typecode; }
- const std::string& getUnit() const { return unit; }
- const std::string& getDesc() const { return description; }
- void updateHash(SchemaHash& hash) const;
- };
-
struct SchemaClassKeyImpl {
const std::string& package;
const std::string& name;
@@ -151,15 +146,15 @@ namespace engine {
SchemaHash hashContainer;
SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
- SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
+ SchemaClassKeyImpl(const qpid::messaging::Variant::Map& map);
static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash);
- static SchemaClassKey* factory(qpid::framing::Buffer& buffer);
+ static SchemaClassKey* factory(qpid::messaging::Variant::Map& map);
const std::string& getPackageName() const { return package; }
const std::string& getClassName() const { return name; }
const uint8_t* getHash() const { return hash.get(); }
- void encode(qpid::framing::Buffer& buffer) const;
+ qpid::messaging::Variant::Map asMap() const;
bool operator==(const SchemaClassKeyImpl& other) const;
bool operator<(const SchemaClassKeyImpl& other) const;
const std::string& str() const;
@@ -172,25 +167,21 @@ namespace engine {
mutable bool hasHash;
std::auto_ptr<SchemaClassKey> classKey;
std::vector<const SchemaProperty*> properties;
- std::vector<const SchemaStatistic*> statistics;
std::vector<const SchemaMethod*> methods;
SchemaObjectClassImpl(const char* p, const char* n) :
package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
- SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
- static SchemaObjectClass* factory(qpid::framing::Buffer& buffer);
+ SchemaObjectClassImpl(const qpid::messaging::Variant::Map& map);
+ static SchemaObjectClass* factory(qpid::messaging::Variant::Map& map);
- void encode(qpid::framing::Buffer& buffer) const;
+ qpid::messaging::Variant::Map asMap() const;
void addProperty(const SchemaProperty* property);
- void addStatistic(const SchemaStatistic* statistic);
void addMethod(const SchemaMethod* method);
const SchemaClassKey* getClassKey() const;
int getPropertyCount() const { return properties.size(); }
- int getStatisticCount() const { return statistics.size(); }
int getMethodCount() const { return methods.size(); }
const SchemaProperty* getProperty(int idx) const;
- const SchemaStatistic* getStatistic(int idx) const;
const SchemaMethod* getMethod(int idx) const;
};
@@ -201,20 +192,18 @@ namespace engine {
mutable bool hasHash;
std::auto_ptr<SchemaClassKey> classKey;
std::string description;
- Severity severity;
std::vector<const SchemaArgument*> arguments;
- SchemaEventClassImpl(const char* p, const char* n, Severity sev) :
- package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)), severity(sev) {}
- SchemaEventClassImpl(qpid::framing::Buffer& buffer);
- static SchemaEventClass* factory(qpid::framing::Buffer& buffer);
+ SchemaEventClassImpl(const char* p, const char* n) :
+ package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
+ SchemaEventClassImpl(const qpid::messaging::Variant::Map& map);
+ static SchemaEventClass* factory(qpid::messaging::Variant::Map& map);
- void encode(qpid::framing::Buffer& buffer) const;
+ qpid::messaging::Variant::Map asMap() const;
void addArgument(const SchemaArgument* argument);
void setDesc(const char* desc) { description = desc; }
const SchemaClassKey* getClassKey() const;
- Severity getSeverity() const { return severity; }
int getArgumentCount() const { return arguments.size(); }
const SchemaArgument* getArgument(int idx) const;
};
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
deleted file mode 100644
index b1c027520f..0000000000
--- a/qpid/cpp/src/qmf/engine/ValueImpl.cpp
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ValueImpl.h"
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::FieldValue;
-
-ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t)
-{
- uint64_t first;
- uint64_t second;
- FieldTable ft;
-
- switch (typecode) {
- case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break;
- case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break;
- case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break;
- case TYPE_UINT64 : value.u64 = buf.getLongLong(); break;
- case TYPE_SSTR : buf.getShortString(stringVal); break;
- case TYPE_LSTR : buf.getMediumString(stringVal); break;
- case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break;
- case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break;
- case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break;
- case TYPE_FLOAT : value.floatVal = buf.getFloat(); break;
- case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break;
- case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break;
- case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break;
- case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break;
- case TYPE_INT64 : value.s64 = buf.getLongLong(); break;
- case TYPE_UUID : buf.getBin128(value.uuidVal); break;
- case TYPE_REF:
- first = buf.getLongLong();
- second = buf.getLongLong();
- refVal.impl->setValue(first, second);
- break;
-
- case TYPE_MAP:
- ft.decode(buf);
- initMap(ft);
- break;
-
- case TYPE_LIST:
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- default:
- break;
- }
-}
-
-ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at)
-{
-}
-
-ValueImpl::ValueImpl(Typecode t) : typecode(t)
-{
- ::memset(&value, 0, sizeof(value));
-}
-
-Value* ValueImpl::factory(Typecode t, Buffer& b)
-{
- ValueImpl* impl(new ValueImpl(t, b));
- return new Value(impl);
-}
-
-Value* ValueImpl::factory(Typecode t)
-{
- ValueImpl* impl(new ValueImpl(t));
- return new Value(impl);
-}
-
-ValueImpl::~ValueImpl()
-{
-}
-
-void ValueImpl::initMap(const FieldTable& ft)
-{
- for (FieldTable::ValueMap::const_iterator iter = ft.begin();
- iter != ft.end(); iter++) {
- const string& name(iter->first);
- const FieldValue& fvalue(*iter->second);
- uint8_t amqType = fvalue.getType();
-
- if (amqType == 0x32) {
- Value* subval(new Value(TYPE_UINT64));
- subval->setUint64(fvalue.get<int64_t>());
- insert(name.c_str(), subval);
- } else if ((amqType & 0xCF) == 0x02) {
- Value* subval(new Value(TYPE_UINT32));
- switch (amqType) {
- case 0x02 : subval->setUint(fvalue.get<int>()); break;
- case 0x12 : subval->setUint(fvalue.get<int>()); break;
- case 0x22 : subval->setUint(fvalue.get<int>()); break;
- }
- insert(name.c_str(), subval);
- } else if ((amqType & 0xCF) == 0x01) {
- Value* subval(new Value(TYPE_INT64));
- subval->setInt64(fvalue.get<int64_t>());
- insert(name.c_str(), subval);
- } else if (amqType == 0x85 || amqType == 0x95) {
- Value* subval(new Value(TYPE_LSTR));
- subval->setString(fvalue.get<string>().c_str());
- insert(name.c_str(), subval);
- } else if (amqType == 0x23 || amqType == 0x33) {
- Value* subval(new Value(TYPE_DOUBLE));
- subval->setDouble(fvalue.get<double>());
- insert(name.c_str(), subval);
- } else {
- FieldTable subFt;
- bool valid = qpid::framing::getEncodedValue<FieldTable>(iter->second, subFt);
- if (valid) {
- Value* subval(new Value(TYPE_MAP));
- subval->impl->initMap(subFt);
- insert(name.c_str(), subval);
- }
- }
- }
-}
-
-void ValueImpl::mapToFieldTable(FieldTable& ft) const
-{
- FieldTable subFt;
-
- for (map<string, Value>::const_iterator iter = mapVal.begin();
- iter != mapVal.end(); iter++) {
- const string& name(iter->first);
- const Value& subval(iter->second);
-
- switch (subval.getType()) {
- case TYPE_UINT8:
- case TYPE_UINT16:
- case TYPE_UINT32:
- ft.setUInt64(name, (uint64_t) subval.asUint());
- break;
- case TYPE_UINT64:
- case TYPE_DELTATIME:
- ft.setUInt64(name, subval.asUint64());
- break;
- case TYPE_SSTR:
- case TYPE_LSTR:
- ft.setString(name, subval.asString());
- break;
- case TYPE_INT64:
- case TYPE_ABSTIME:
- ft.setInt64(name, subval.asInt64());
- break;
- case TYPE_BOOL:
- ft.setInt(name, subval.asBool() ? 1 : 0);
- break;
- case TYPE_FLOAT:
- ft.setFloat(name, subval.asFloat());
- break;
- case TYPE_DOUBLE:
- ft.setDouble(name, subval.asDouble());
- break;
- case TYPE_INT8:
- case TYPE_INT16:
- case TYPE_INT32:
- ft.setInt(name, subval.asInt());
- break;
- case TYPE_MAP:
- subFt.clear();
- subval.impl->mapToFieldTable(subFt);
- ft.setTable(name, subFt);
- break;
- case TYPE_LIST:
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- case TYPE_UUID:
- case TYPE_REF:
- default:
- break;
- }
- }
- }
-
-void ValueImpl::encode(Buffer& buf) const
-{
- FieldTable ft;
-
- switch (typecode) {
- case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break;
- case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break;
- case TYPE_UINT32 : buf.putLong(value.u32); break;
- case TYPE_UINT64 : buf.putLongLong(value.u64); break;
- case TYPE_SSTR : buf.putShortString(stringVal); break;
- case TYPE_LSTR : buf.putMediumString(stringVal); break;
- case TYPE_ABSTIME : buf.putLongLong(value.s64); break;
- case TYPE_DELTATIME : buf.putLongLong(value.u64); break;
- case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break;
- case TYPE_FLOAT : buf.putFloat(value.floatVal); break;
- case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break;
- case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break;
- case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break;
- case TYPE_INT32 : buf.putLong(value.s32); break;
- case TYPE_INT64 : buf.putLongLong(value.s64); break;
- case TYPE_UUID : buf.putBin128(value.uuidVal); break;
- case TYPE_REF : refVal.impl->encode(buf); break;
- case TYPE_MAP:
- mapToFieldTable(ft);
- ft.encode(buf);
- break;
- case TYPE_LIST:
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- default:
- break;
- }
-}
-
-bool ValueImpl::keyInMap(const char* key) const
-{
- return typecode == TYPE_MAP && mapVal.count(key) > 0;
-}
-
-Value* ValueImpl::byKey(const char* key)
-{
- if (keyInMap(key)) {
- map<string, Value>::iterator iter = mapVal.find(key);
- if (iter != mapVal.end())
- return &iter->second;
- }
- return 0;
-}
-
-const Value* ValueImpl::byKey(const char* key) const
-{
- if (keyInMap(key)) {
- map<string, Value>::const_iterator iter = mapVal.find(key);
- if (iter != mapVal.end())
- return &iter->second;
- }
- return 0;
-}
-
-void ValueImpl::deleteKey(const char* key)
-{
- mapVal.erase(key);
-}
-
-void ValueImpl::insert(const char* key, Value* val)
-{
- pair<string, Value> entry(key, *val);
- mapVal.insert(entry);
-}
-
-const char* ValueImpl::key(uint32_t idx) const
-{
- map<string, Value>::const_iterator iter = mapVal.begin();
- for (uint32_t i = 0; i < idx; i++) {
- if (iter == mapVal.end())
- break;
- iter++;
- }
-
- if (iter == mapVal.end())
- return 0;
- else
- return iter->first.c_str();
-}
-
-Value* ValueImpl::listItem(uint32_t)
-{
- return 0;
-}
-
-void ValueImpl::appendToList(Value*)
-{
-}
-
-void ValueImpl::deleteListItem(uint32_t)
-{
-}
-
-Value* ValueImpl::arrayItem(uint32_t)
-{
- return 0;
-}
-
-void ValueImpl::appendToArray(Value*)
-{
-}
-
-void ValueImpl::deleteArrayItem(uint32_t)
-{
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
-Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {}
-Value::Value(ValueImpl* i) : impl(i) {}
-Value::~Value() { delete impl;}
-
-Typecode Value::getType() const { return impl->getType(); }
-bool Value::isNull() const { return impl->isNull(); }
-void Value::setNull() { impl->setNull(); }
-bool Value::isObjectId() const { return impl->isObjectId(); }
-const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
-void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
-bool Value::isUint() const { return impl->isUint(); }
-uint32_t Value::asUint() const { return impl->asUint(); }
-void Value::setUint(uint32_t val) { impl->setUint(val); }
-bool Value::isInt() const { return impl->isInt(); }
-int32_t Value::asInt() const { return impl->asInt(); }
-void Value::setInt(int32_t val) { impl->setInt(val); }
-bool Value::isUint64() const { return impl->isUint64(); }
-uint64_t Value::asUint64() const { return impl->asUint64(); }
-void Value::setUint64(uint64_t val) { impl->setUint64(val); }
-bool Value::isInt64() const { return impl->isInt64(); }
-int64_t Value::asInt64() const { return impl->asInt64(); }
-void Value::setInt64(int64_t val) { impl->setInt64(val); }
-bool Value::isString() const { return impl->isString(); }
-const char* Value::asString() const { return impl->asString(); }
-void Value::setString(const char* val) { impl->setString(val); }
-bool Value::isBool() const { return impl->isBool(); }
-bool Value::asBool() const { return impl->asBool(); }
-void Value::setBool(bool val) { impl->setBool(val); }
-bool Value::isFloat() const { return impl->isFloat(); }
-float Value::asFloat() const { return impl->asFloat(); }
-void Value::setFloat(float val) { impl->setFloat(val); }
-bool Value::isDouble() const { return impl->isDouble(); }
-double Value::asDouble() const { return impl->asDouble(); }
-void Value::setDouble(double val) { impl->setDouble(val); }
-bool Value::isUuid() const { return impl->isUuid(); }
-const uint8_t* Value::asUuid() const { return impl->asUuid(); }
-void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
-bool Value::isObject() const { return impl->isObject(); }
-const Object* Value::asObject() const { return impl->asObject(); }
-void Value::setObject(Object* val) { impl->setObject(val); }
-bool Value::isMap() const { return impl->isMap(); }
-bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
-Value* Value::byKey(const char* key) { return impl->byKey(key); }
-const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
-void Value::deleteKey(const char* key) { impl->deleteKey(key); }
-void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
-uint32_t Value::keyCount() const { return impl->keyCount(); }
-const char* Value::key(uint32_t idx) const { return impl->key(idx); }
-bool Value::isList() const { return impl->isList(); }
-uint32_t Value::listItemCount() const { return impl->listItemCount(); }
-Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
-void Value::appendToList(Value* val) { impl->appendToList(val); }
-void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
-bool Value::isArray() const { return impl->isArray(); }
-Typecode Value::arrayType() const { return impl->arrayType(); }
-uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
-Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
-void Value::appendToArray(Value* val) { impl->appendToArray(val); }
-void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
-
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h
deleted file mode 100644
index 84b0e768e6..0000000000
--- a/qpid/cpp/src/qmf/engine/ValueImpl.h
+++ /dev/null
@@ -1,160 +0,0 @@
-#ifndef _QmfEngineValueImpl_
-#define _QmfEngineValueImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Value.h>
-#include <qmf/engine/ObjectIdImpl.h>
-#include <qmf/engine/Object.h>
-#include <qpid/framing/Buffer.h>
-#include <string>
-#include <string.h>
-#include <map>
-#include <vector>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace framing {
- class FieldTable;
-}
-}
-
-namespace qmf {
-namespace engine {
-
- // TODO: set valid flag on all value settors
- // TODO: add a modified flag and accessors
-
- struct ValueImpl {
- const Typecode typecode;
- bool valid;
-
- ObjectId refVal;
- std::string stringVal;
- std::auto_ptr<Object> objectVal;
- std::map<std::string, Value> mapVal;
- std::vector<Value> vectorVal;
- Typecode arrayTypecode;
-
- union {
- uint32_t u32;
- uint64_t u64;
- int32_t s32;
- int64_t s64;
- bool boolVal;
- float floatVal;
- double doubleVal;
- uint8_t uuidVal[16];
- } value;
-
- ValueImpl(const ValueImpl& from) :
- typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal),
- objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0),
- mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode),
- value(from.value) {}
-
- ValueImpl(Typecode t, Typecode at);
- ValueImpl(Typecode t, qpid::framing::Buffer& b);
- ValueImpl(Typecode t);
- static Value* factory(Typecode t, qpid::framing::Buffer& b);
- static Value* factory(Typecode t);
- ~ValueImpl();
-
- void encode(qpid::framing::Buffer& b) const;
-
- Typecode getType() const { return typecode; }
- bool isNull() const { return !valid; }
- void setNull() { valid = false; }
-
- bool isObjectId() const { return typecode == TYPE_REF; }
- const ObjectId& asObjectId() const { return refVal; }
- void setObjectId(const ObjectId& o) { refVal = o; } // TODO
-
- bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; }
- uint32_t asUint() const { return value.u32; }
- void setUint(uint32_t val) { value.u32 = val; }
-
- bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; }
- int32_t asInt() const { return value.s32; }
- void setInt(int32_t val) { value.s32 = val; }
-
- bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; }
- uint64_t asUint64() const { return value.u64; }
- void setUint64(uint64_t val) { value.u64 = val; }
-
- bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; }
- int64_t asInt64() const { return value.s64; }
- void setInt64(int64_t val) { value.s64 = val; }
-
- bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; }
- const char* asString() const { return stringVal.c_str(); }
- void setString(const char* val) { stringVal = val; }
-
- bool isBool() const { return typecode == TYPE_BOOL; }
- bool asBool() const { return value.boolVal; }
- void setBool(bool val) { value.boolVal = val; }
-
- bool isFloat() const { return typecode == TYPE_FLOAT; }
- float asFloat() const { return value.floatVal; }
- void setFloat(float val) { value.floatVal = val; }
-
- bool isDouble() const { return typecode == TYPE_DOUBLE; }
- double asDouble() const { return value.doubleVal; }
- void setDouble(double val) { value.doubleVal = val; }
-
- bool isUuid() const { return typecode == TYPE_UUID; }
- const uint8_t* asUuid() const { return value.uuidVal; }
- void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); }
-
- bool isObject() const { return typecode == TYPE_OBJECT; }
- Object* asObject() const { return objectVal.get(); }
- void setObject(Object* val) { objectVal.reset(val); }
-
- bool isMap() const { return typecode == TYPE_MAP; }
- bool keyInMap(const char* key) const;
- Value* byKey(const char* key);
- const Value* byKey(const char* key) const;
- void deleteKey(const char* key);
- void insert(const char* key, Value* val);
- uint32_t keyCount() const { return mapVal.size(); }
- const char* key(uint32_t idx) const;
-
- bool isList() const { return typecode == TYPE_LIST; }
- uint32_t listItemCount() const { return vectorVal.size(); }
- Value* listItem(uint32_t idx);
- void appendToList(Value* val);
- void deleteListItem(uint32_t idx);
-
- bool isArray() const { return typecode == TYPE_ARRAY; }
- Typecode arrayType() const { return arrayTypecode; }
- uint32_t arrayItemCount() const { return vectorVal.size(); }
- Value* arrayItem(uint32_t idx);
- void appendToArray(Value* val);
- void deleteArrayItem(uint32_t idx);
-
- private:
- void mapToFieldTable(qpid::framing::FieldTable& ft) const;
- void initMap(const qpid::framing::FieldTable& ft);
- };
-}
-}
-
-#endif
-