summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-02-26 23:11:19 +0000
committerTed Ross <tross@apache.org>2010-02-26 23:11:19 +0000
commit3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (patch)
tree16f81a8ee4c6ff28ea4c3fd6e2aaf9f9a24e71ef
parentc1611f64a7c9dce39c19794dd3d887e3f1815b29 (diff)
downloadqpid-python-3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b.tar.gz
Checkpointing Agent engine code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/Makefile.am2
-rw-r--r--qpid/cpp/include/qmf/Agent.h135
-rw-r--r--qpid/cpp/include/qmf/Notifiable.h48
-rw-r--r--qpid/cpp/include/qmf/Protocol.h37
-rw-r--r--qpid/cpp/include/qmf/engine/Agent.h19
-rw-r--r--qpid/cpp/include/qmf/engine/Console.h16
-rw-r--r--qpid/cpp/include/qmf/engine/Query.h1
-rw-r--r--qpid/cpp/include/qpid/messaging/ListContent.h1
-rw-r--r--qpid/cpp/include/qpid/messaging/MapContent.h1
-rw-r--r--qpid/cpp/src/qmf.mk6
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp145
-rw-r--r--qpid/cpp/src/qmf/Protocol.cpp24
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp131
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp136
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h17
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp14
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/ListContent.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/MapContent.cpp6
-rw-r--r--qpid/cpp/src/tests/MessagingSessionTests.cpp53
20 files changed, 550 insertions, 252 deletions
diff --git a/qpid/cpp/Makefile.am b/qpid/cpp/Makefile.am
index dcbc4c10c3..7807492627 100644
--- a/qpid/cpp/Makefile.am
+++ b/qpid/cpp/Makefile.am
@@ -27,7 +27,7 @@ EXTRA_DIST = \
xml/cluster.xml INSTALL-WINDOWS CMakeLists.txt BuildInstallSettings.cmake \
packaging/NSIS
-SUBDIRS = managementgen etc src docs/api docs/man examples bindings/qmf
+SUBDIRS = managementgen etc src docs/api docs/man examples
# Update libtool, if needed.
libtool: $(LIBTOOL_DEPS)
diff --git a/qpid/cpp/include/qmf/Agent.h b/qpid/cpp/include/qmf/Agent.h
index e61cd737d0..d982a05cf5 100644
--- a/qpid/cpp/include/qmf/Agent.h
+++ b/qpid/cpp/include/qmf/Agent.h
@@ -21,25 +21,29 @@
*/
#include "qmf/QmfImportExport.h"
+#include "qmf/Notifiable.h"
+#include "qpid/sys/Time.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Variant.h"
+#include <string>
namespace qmf {
class AgentImpl;
- class Connection;
class ObjectId;
class AgentObject;
- class Value;
class Event;
- class SchemaObjectClass;
+ class SchemaClass;
+ class Query;
/**
- * AgentListener is used by agents that select the internalStore=false option (see Agent
+ * AgentHandler is used by agents that select the internalStore=false option (see Agent
* constructor) or by agents that wish to provide access control for queries and methods.
*
* \ingroup qmfapi
*/
- class AgentListener {
- QMF_EXTERN virtual ~AgentListener();
+ class AgentHandler {
+ QMF_EXTERN virtual ~AgentHandler();
/**
* allowQuery is called before a query operation is executed. If true is returned
@@ -49,7 +53,7 @@ namespace qmf {
* @param q The query being requested.
* @param userId The authenticated identity of the user requesting the query.
*/
- virtual bool allowQuery(const Query& q, const char* userId);
+ virtual bool allowQuery(const Query& q, const std::string& userId);
/**
* allowMethod is called before a method call is executed. If true is returned
@@ -62,8 +66,11 @@ namespace qmf {
* @param cls The Schema describing the object being called.
* @param userId The authenticated identity of the requesting user.
*/
- virtual bool allowMethod(const char* name, const Value& args, const ObjectId& oid,
- const SchemaObjectClass& cls, const char* userId);
+ virtual bool allowMethod(const std::string& name,
+ const qpid::messaging::Variant::Map& args,
+ const ObjectId& oid,
+ const SchemaClass& cls,
+ const std::string& userId);
/**
* query is called when the agent receives a query request. The handler must invoke
@@ -78,11 +85,11 @@ namespace qmf {
* @param q The query requested by the console.
* @param userId the authenticated identity of the user requesting the query.
*/
- virtual void query(uint32_t context, const Query& q, const char* userId);
+ virtual void query(uint32_t context, const Query& q, const std::string& userId);
/**
* syncStart is called when a console requests a standing query. This function must
- * behave exactly like AgentListener::query (i.e. send zero or more responses followed
+ * behave exactly like AgentHandler::query (i.e. send zero or more responses followed
* by a queryComplete) except it then remembers the context and the query and makes
* subsequent queryResponse calls whenever appropriate according the the query.
*
@@ -96,7 +103,7 @@ namespace qmf {
* @param q The query requested by the console.
* @param userId the authenticated identity of the user requesting the query.
*/
- virtual void syncStart(uint32_t context, const Query& q, const char* userId);
+ virtual void syncStart(uint32_t context, const Query& q, const std::string& userId);
/**
* syncTouch is called when the console that requested a standing query refreshes its
@@ -109,7 +116,7 @@ namespace qmf {
* @param context The context supplied in a previous call to syncStart.
* @param userId The authenticated identity of the requesting user.
*/
- virtual void syncTouch(uint32_t context, const char* userId);
+ virtual void syncTouch(uint32_t context, const std::string& userId);
/**
* syncStop is called when the console that requested a standing query no longer wishes to
@@ -121,7 +128,7 @@ namespace qmf {
* @param context The context supplied in a previous call to syncStart.
* @param userId The authenticated identity of the requesting user.
*/
- virtual void syncStop(uint32_t context, const char* userId);
+ virtual void syncStop(uint32_t context, const std::string& userId);
/**
* methodCall is called when a console invokes a method on a QMF object. The application
@@ -138,8 +145,8 @@ namespace qmf {
* @param cls The Schema describing the object being called.
* @param userId The authenticated identity of the requesting user.
*/
- virtual void methodCall(uint32_t context, const char* name, Value& args,
- const ObjectId& oid, const SchemaObjectClass& cls, const char* userId);
+ virtual void methodCall(uint32_t context, const std::string& name, qpid::messaging::Variant::Map& args,
+ const ObjectId& oid, const SchemaClass& cls, const std::string& userId);
};
/**
@@ -153,7 +160,16 @@ namespace qmf {
/**
* Create an instance of the Agent class.
*
- * @param label An optional string label that can be used to identify the agent.
+ * @param vendor A string identifying the vendor of the agent application.
+ * This should follow the reverse-domain-name form (i.e. org.apache).
+ *
+ * @param product A string identifying the product provided by the vendor.
+ *
+ * @param instance A string that uniquely identifies this instance of the agent.
+ * If zero, the agent will generate a guid for the instance string.
+ *
+ * @param domain A string that defines the QMF domain that this agent should join.
+ * If zero, the agent will join the default QMF domain.
*
* @param internalStore If true, objects shall be tracked internally by the agent.
* If false, the user of the agent must track the objects.
@@ -162,13 +178,19 @@ namespace qmf {
* individual operations. If the user is tracking the objects, the user code
* must implement queries and syncs (standing queries).
*
- * @param listener A pointer to a class that implements the AgentListener interface.
+ * @param handler A pointer to a class that implements the AgentHandler interface.
* This must be supplied if any of the following conditions are true:
* - The agent model contains methods
* - The user wishes to individually authorize query and sync operations.
* - internalStore = false
+ *
+ * @param notifiable A pointer to a class that implements the Notifiable interface.
+ * This argument is optional (may be supplied as 0). If it is not supplied,
+ * notification callbacks will not be invoked.
*/
- QMF_EXTERN Agent(char* label="qmfa", bool internalStore=true, AgentListener* listener=0);
+ QMF_EXTERN Agent(const std::string& vendor, const std::string& product, const std::string& instance="",
+ const std::string& domain="", bool internalStore=true,
+ AgentHandler* handler=0, Notifiable* notifiable=0);
/**
* Destroy an instance of the Agent class.
@@ -176,20 +198,30 @@ namespace qmf {
QMF_EXTERN ~Agent();
/**
+ * Set an attribute for the agent. Attributes are visible to consoles and can be used to find
+ * agents.
+ *
+ * @param name Name of the attribute to be set (or overwritten)
+ *
+ * @param value Value (of any variant type) of the attribute
+ */
+ QMF_EXTERN void setAttribute(const std::string& name, const qpid::messaging::Variant& value);
+
+ /**
* Set the persistent store file. This file, if specified, is used to store state information
* about the Agent. For example, if object-ids must be persistent across restarts of the Agent
* program, this file path must be supplied.
*
* @param path Full path to a file that is both writable and readable by the Agent program.
*/
- QMF_EXTERN void setStoreDir(const char* path);
+ QMF_EXTERN void setStoreDir(const std::string& path);
/**
* Provide a connection (to a Qpid broker) over which the agent can communicate.
*
* @param conn Pointer to a Connection object.
*/
- QMF_EXTERN void setConnection(Connection* conn);
+ QMF_EXTERN void setConnection(qpid::messaging::Connection& conn);
/**
* Register a class schema (object or event) with the agent. The agent must have a registered
@@ -198,30 +230,29 @@ namespace qmf {
*
* @param cls Pointer to the schema structure describing the class.
*/
- QMF_EXTERN void registerClass(SchemaObjectClass* cls);
- QMF_EXTERN void registerClass(SchemaEventClass* cls);
+ QMF_EXTERN void registerClass(SchemaClass* cls);
/**
- * Add an object to the agent (for internal storage mode only).
- *
- * @param obj Reference to the object to be managed by the agent.
+ * Invoke the handler (if supplied in the constructor) with events stored in the Agent's work
+ * queue. This function call is a way of supplying the Agent with a thread on which to run the
+ * application's handler (the Agent will never invoke the handler on one of its internal threads).
*
- * @param persistent Iff true, the object ID assigned to the object shall indicate persistence
- * (i.e. the object ID shall be the same across restarts of the agent program).
- *
- * @param oid 64-bit value for the oid (if zero, the agent will assign the value).
+ * @param limit The maximum number of handler callbacks to invoke during this call. Zero means
+ * there will be no limit on the number of invocations.
*
- * @param oidLo 32-bit value for the lower 32-bits of the oid.
+ * @param timeout The time this call will block if there are no handler events to process.
*
- * @param oidHi 32-bit value for the upper 32-bits of the oid.
+ * @return The number of handler events processed. If the timeout expired, the return value will
+ * be zero.
*/
- QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0);
- QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+ QMF_EXTERN uint32_t invokeHandler(uint32_t limit=0, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE);
/**
- * Allocate an object ID for an object (for external storage mode only).
+ * Add an object to the agent (for internal storage mode only).
+ *
+ * @param obj Reference to the object to be managed by the agent.
*
- * @param persistent Iff true, the object ID allocated shall indicate persistence
+ * @param persistent Iff true, the object ID assigned to the object shall indicate persistence
* (i.e. the object ID shall be the same across restarts of the agent program).
*
* @param oid 64-bit value for the oid (if zero, the agent will assign the value).
@@ -230,8 +261,8 @@ namespace qmf {
*
* @param oidHi 32-bit value for the upper 32-bits of the oid.
*/
- QMF_EXTERN const ObjectId* allocObjectId(bool persistent=false, uint64_t oid=0);
- QMF_EXTERN const ObjectId* allocObjectId(bool persistent, uint32_t oidLo, uint32_t oidHi);
+ QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0);
+ QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
/**
* Raise a QMF event.
@@ -243,42 +274,44 @@ namespace qmf {
/**
* Provide a response to a query (for external storage mode only).
*
- * @param context The context value supplied in the query (via the AgentListener interface).
+ * @param context The context value supplied in the query (via the AgentHandler interface).
*
* @param object A reference to the agent that matched the query criteria.
- *
- * @param prop If true, transmit the property attributes of this object.
- *
- * @param stat If true, transmit the statistic attributes of this object.
*/
- QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object, bool prop = true, bool stat = true);
+ QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object);
/**
* Indicate that a query (or the initial dump of a sync) is complete (for external storage mode only).
*
- * @param context The context value supplied in the query/sync (via the AgentListener interface).
+ * @param context The context value supplied in the query/sync (via the AgentHandler interface).
*/
QMF_EXTERN void queryComplete(uint32_t context);
/**
* Provide the response to a method call.
*
- * @param context The context value supplied in the method request (via the AgentListener interface).
+ * @param context The context value supplied in the method request (via the AgentHandler interface).
*
* @param args The argument list from the method call. Must include the output arguments (may include
* the input arguments).
*
- * @param status Numerical return status: zero indicates no error, non-zero indicates error.
- *
- * @param exception Pointer to an exception value. If status is non-zero, the exception value is
+ * @param exception Pointer to an exception value. If status is non-null, the exception value is
* sent to the caller. It is optional (i.e. leave the pointer as 0), or may be
* set to any legal value. A string may be supplied, but an unmanaged object of
* any schema may also be passed.
*/
- QMF_EXTERN void methodResponse(uint32_t context, const Value& args, uint32_t status=0,
- const Value* exception=0);
+ QMF_EXTERN void methodResponse(uint32_t context,
+ const qpid::messaging::Variant::Map& args,
+ const qpid::messaging::Variant& exception=qpid::messaging::Variant());
private:
+ /**
+ * Private copy constructor and assignment operator ensure that objects of this class cannot
+ * be copied.
+ */
+ Agent(const Agent&);
+ const Agent& operator=(const Agent&);
+
AgentImpl* impl;
};
diff --git a/qpid/cpp/include/qmf/Notifiable.h b/qpid/cpp/include/qmf/Notifiable.h
new file mode 100644
index 0000000000..43f546d9cd
--- /dev/null
+++ b/qpid/cpp/include/qmf/Notifiable.h
@@ -0,0 +1,48 @@
+#ifndef _QmfNotifiable_
+#define _QmfNotifiable_
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/QmfImportExport.h"
+
+namespace qmf {
+
+ /**
+ * The Notifiable class is an interface that may be used in the application. It provides
+ * a single callback (notify) that is invoked when the agent or console has events to be
+ * handled by the application.
+ *
+ * This interface should only be used in an application that has more event drivers than
+ * just a single QMF interface. For example, an application that already uses select or
+ * poll to control execution can use the notify callback to awaken the select/poll call.
+ *
+ * No QMF operations should be performed from the notify callback. It should only be used
+ * to awaken an application thread that will then perform QMF operations.
+ *
+ * \ingroup qmfapi
+ */
+ class Notifiable {
+ public:
+ QMF_EXTERN virtual ~Notifiable();
+ virtual void notify() = 0;
+ };
+}
+
+#endif
diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h
index c45ce13434..361a7bd283 100644
--- a/qpid/cpp/include/qmf/Protocol.h
+++ b/qpid/cpp/include/qmf/Protocol.h
@@ -57,6 +57,43 @@ namespace qmf {
const static std::string SUBTYPE_SCHEMA_PROPERTY;
const static std::string SUBTYPE_SCHEMA_METHOD;
+ /**
+ * Message Content Types
+ */
+ const static std::string AMQP_CONTENT_MAP;
+ const static std::string AMQP_CONTENT_LIST;
+
+ /**
+ * Application Header Keys
+ */
+ const static std::string APP_OPCODE;
+
+ /**
+ * QMF Op Codes
+ */
+ const static std::string OP_AGENT_LOCATE_REQUEST;
+ const static std::string OP_AGENT_LOCATE_RESPONSE;
+ const static std::string OP_AGENT_HEARTBEAT_INDICATION;
+ const static std::string OP_QUERY_REQUEST;
+ const static std::string OP_QUERY_RESPONSE;
+ const static std::string OP_SUBSCRIBE_REQUEST;
+ const static std::string OP_SUBSCRIBE_RESPONSE;
+ const static std::string OP_SUBSCRIBE_CANCEL_INDICATION;
+ const static std::string OP_SUBSCRIBE_REFRESH_REQUEST;
+ const static std::string OP_DATA_INDICATION;
+ const static std::string OP_METHOD_REQUEST;
+ const static std::string OP_METHOD_RESPONSE;
+
+ /**
+ * Content type definitions
+ */
+ const static std::string CONTENT_PACKAGE;
+ const static std::string CONTENT_SCHEMA_ID;
+ const static std::string CONTENT_SCHEMA_CLASS;
+ const static std::string CONTENT_OBJECT_ID;
+ const static std::string CONTENT_DATA;
+ const static std::string CONTENT_EVENT;
+
/*
const static uint8_t OP_ATTACH_REQUEST = 'A';
const static uint8_t OP_ATTACH_RESPONSE = 'a';
diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h
index 268e53a9a5..4585ce51cc 100644
--- a/qpid/cpp/include/qmf/engine/Agent.h
+++ b/qpid/cpp/include/qmf/engine/Agent.h
@@ -20,6 +20,7 @@
* under the License.
*/
+#include <qmf/Notifiable.h>
#include <qmf/engine/Schema.h>
#include <qmf/engine/ObjectId.h>
#include <qmf/engine/Object.h>
@@ -65,10 +66,28 @@ namespace engine {
*/
class Agent {
public:
+ /**
+ * Declare a type for a notification callback.
+ */
+ typedef void (*notifyCb)();
+
Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true);
~Agent();
/**
+ * Provide the Agent with a notification callback that is invoked whenever there is new work
+ * placed on the event queue.
+ *
+ * There are two flavors of notification callback: C-style based on the
+ * type "notifyCb"; and the C++ style based on the Notifiable class.
+ * The C++ style can be used for C++ wrappers/applications and the
+ * C-style can be used for C wrappers/applications and also for
+ * Swig-based script wrappers.
+ */
+ void setNotifyCallback(notifyCb handler);
+ void setNotifyCallback(Notifiable* handler);
+
+ /**
* Set an agent attribute that can be used to describe this agent to consoles.
*@param key Null-terminated string that is the name of the attribute.
*@param value Variant value (or any API type) of the attribute.
diff --git a/qpid/cpp/include/qmf/engine/Console.h b/qpid/cpp/include/qmf/engine/Console.h
index 853d0362f3..2f611cecee 100644
--- a/qpid/cpp/include/qmf/engine/Console.h
+++ b/qpid/cpp/include/qmf/engine/Console.h
@@ -98,8 +98,6 @@ namespace engine {
Event* event; // (EVENT_RECEIVED)
uint64_t timestamp; // (AGENT_HEARTBEAT)
QueryResponse* queryResponse; // (QUERY_COMPLETE)
- bool hasProps;
- bool hasStats;
};
/**
@@ -107,21 +105,11 @@ namespace engine {
*/
struct BrokerEvent {
enum EventKind {
- BROKER_INFO = 10,
- DECLARE_QUEUE = 11,
- DELETE_QUEUE = 12,
- BIND = 13,
- UNBIND = 14,
- SETUP_COMPLETE = 15,
- STABLE = 16,
QUERY_COMPLETE = 17,
METHOD_RESPONSE = 18
};
EventKind kind;
- char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND)
- char* exchange; // ([UN]BIND)
- char* bindingKey; // ([UN]BIND)
void* context; // (QUERY_COMPLETE, METHOD_RESPONSE)
QueryResponse* queryResponse; // (QUERY_COMPLETE)
MethodResponse* methodResponse; // (METHOD_RESPONSE)
@@ -155,10 +143,6 @@ namespace engine {
BrokerProxy(Console& console);
~BrokerProxy();
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
bool getEvent(BrokerEvent& event) const;
void popEvent();
diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h
index 08e4ebf3cc..8954a08285 100644
--- a/qpid/cpp/include/qmf/engine/Query.h
+++ b/qpid/cpp/include/qmf/engine/Query.h
@@ -53,6 +53,7 @@ namespace engine {
private:
friend struct QueryImpl;
+ friend struct BrokerProxyImpl;
QueryImpl* impl;
};
}
diff --git a/qpid/cpp/include/qpid/messaging/ListContent.h b/qpid/cpp/include/qpid/messaging/ListContent.h
index 1c4e13716d..f57a920a88 100644
--- a/qpid/cpp/include/qpid/messaging/ListContent.h
+++ b/qpid/cpp/include/qpid/messaging/ListContent.h
@@ -42,6 +42,7 @@ class ListContent
typedef Variant::List::const_reverse_iterator const_reverse_iterator;
QPID_CLIENT_EXTERN ListContent(Message&);
+ QPID_CLIENT_EXTERN ListContent(Message&, const Variant::List&);
QPID_CLIENT_EXTERN ~ListContent();
QPID_CLIENT_EXTERN const_iterator begin() const;
diff --git a/qpid/cpp/include/qpid/messaging/MapContent.h b/qpid/cpp/include/qpid/messaging/MapContent.h
index b05cb31295..3a80a38732 100644
--- a/qpid/cpp/include/qpid/messaging/MapContent.h
+++ b/qpid/cpp/include/qpid/messaging/MapContent.h
@@ -47,6 +47,7 @@ class MapContent
typedef std::map<key_type, Variant>::reverse_iterator reverse_iterator;
QPID_CLIENT_EXTERN MapContent(Message&);
+ QPID_CLIENT_EXTERN MapContent(Message&, const Variant::Map&);
QPID_CLIENT_EXTERN ~MapContent();
QPID_CLIENT_EXTERN const_iterator begin() const;
diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk
index 34f93c63ed..80e4f5bc46 100644
--- a/qpid/cpp/src/qmf.mk
+++ b/qpid/cpp/src/qmf.mk
@@ -44,7 +44,9 @@ QMF_ENGINE_API = \
../include/qmf/engine/Object.h \
../include/qmf/engine/QmfEngineImportExport.h \
../include/qmf/engine/Query.h \
- ../include/qmf/engine/Schema.h
+ ../include/qmf/engine/Schema.h \
+ ../include/qmf/Agent.h \
+ ../include/qmf/Notifiable.h
# ../include/qmf/engine/ObjectId.h
@@ -58,6 +60,8 @@ libqmf_la_SOURCES = \
qpid/agent/ManagementAgentImpl.cpp \
qpid/agent/ManagementAgentImpl.h
+# qmf/Agent.cpp
+
libqmfengine_la_SOURCES = \
$(QMF_ENGINE_API) \
qmf/engine/Agent.cpp \
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
new file mode 100644
index 0000000000..bdaf064f0a
--- /dev/null
+++ b/qpid/cpp/src/qmf/Agent.cpp
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "qmf/Agent.h"
+#include "qmf/engine/Agent.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+
+using namespace std;
+using namespace qmf;
+using namespace qpid::messaging;
+using qpid::sys::Duration;
+
+namespace qmf {
+ class AgentImpl {
+ public:
+ AgentImpl(const string& vendor, const string& product, const string& instance, const string& domain, bool internalStore,
+ AgentHandler* handler, Notifiable* notifiable);
+ ~AgentImpl();
+ void setAttribute(const string& name, const Variant& value) { agentEngine.setAttr(name.c_str(), value); }
+ void setStoreDir(const string& path) { agentEngine.setStoreDir(path.c_str()); }
+ void setConnection(Connection& conn) { agentEngine.setConnection(conn); }
+ void registerClass(SchemaClass* cls) { agentEngine.registerClass(cls); }
+ uint32_t invokeHandler(uint32_t limit, Duration timeout);
+ // const ObjectId* addObject(AgentObject& obj, bool persistent, uint64_t oid);
+ // const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+ void raiseEvent(Event& event);
+ void queryResponse(uint32_t context, AgentObject& object);
+ void queryComplete(uint32_t context);
+ void methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception);
+ private:
+ const string vendor;
+ const string product;
+ const string instance;
+ const string domain;
+ const bool internalStore;
+ AgentHandler* handler;
+ Notifiable* notifiable;
+ engine::Agent agentEngine;
+ qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ };
+}
+
+AgentImpl::AgentImpl(const string& _vendor, const string& _product, const string& _instance, const string& _domain,
+ bool _internalStore, AgentHandler* _handler, Notifiable* _notifiable) :
+ vendor(_vendor), product(_product), instance(_instance.empty() ? "TODO" : _instance),
+ domain(_domain.empty() ? "default" : _domain), internalStore(_internalStore),
+ handler(_handler), notifiable(_notifiable),
+ agentEngine(vendor.c_str(), product.c_str(), instance.c_str(), domain.c_str(), internalStore)
+{
+}
+
+AgentImpl::~AgentImpl()
+{
+}
+
+void AgentImpl::registerClass(SchemaClass* /*cls*/)
+{
+}
+
+uint32_t AgentImpl::invokeHandler(uint32_t limit, Duration timeout)
+{
+ engine::AgentEvent event;
+ bool valid;
+ qpid::sys::AbsTime endTime(qpid::sys::now(), timeout);
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ valid = agentEngine.getEvent(event);
+ while (!valid) {
+ if (!cond.wait(lock, endTime))
+ return 0;
+ valid = agentEngine.getEvent(event);
+ }
+ }
+
+ uint32_t count = 0;
+ while (valid) {
+ // TODO: Process event
+ count++;
+ if (limit > 0 && count == limit)
+ break;
+ agentEngine.popEvent();
+ valid = agentEngine.getEvent(event);
+ }
+
+ return count;
+}
+
+// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+
+void AgentImpl::raiseEvent(Event& /*event*/)
+{
+}
+
+void AgentImpl::queryResponse(uint32_t /*context*/, AgentObject& /*object*/)
+{
+}
+
+void AgentImpl::queryComplete(uint32_t /*context*/)
+{
+}
+
+void AgentImpl::methodResponse(uint32_t /*context*/, const Variant::Map& /*args*/, const Variant& /*exception*/)
+{
+}
+
+
+//==================================================================
+// Wrappers
+//==================================================================
+Agent::Agent(const string& vendor, const string& product, const string& instance, const string& domain,
+ bool internalStore, AgentHandler* handler, Notifiable* notifiable) {
+ impl = new AgentImpl(vendor, product, instance, domain, internalStore, handler, notifiable); }
+Agent::~Agent() { delete impl; }
+void Agent::setAttribute(const string& name, const qpid::messaging::Variant& value) { impl->setAttribute(name, value); }
+void Agent::setStoreDir(const string& path) { impl->setStoreDir(path); }
+void Agent::setConnection(qpid::messaging::Connection& conn) { impl->setConnection(conn); }
+void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); }
+uint32_t Agent::invokeHandler(uint32_t limit, qpid::sys::Duration timeout) { return impl->invokeHandler(limit, timeout); }
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint64_t oid);
+//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi);
+void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
+void Agent::queryResponse(uint32_t context, AgentObject& object) { impl->queryResponse(context, object); }
+void Agent::queryComplete(uint32_t context) { impl->queryComplete(context); }
+void Agent::methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception) { impl->methodResponse(context, args, exception); }
+
diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp
index faaa4c567d..518d263080 100644
--- a/qpid/cpp/src/qmf/Protocol.cpp
+++ b/qpid/cpp/src/qmf/Protocol.cpp
@@ -43,6 +43,30 @@ const string Protocol::SUBTYPES("_subtypes");
const string Protocol::SUBTYPE_SCHEMA_PROPERTY("qmfProperty");
const string Protocol::SUBTYPE_SCHEMA_METHOD("qmfMethod");
+const string Protocol::AMQP_CONTENT_MAP("amqp/map");
+const string Protocol::AMQP_CONTENT_LIST("amqp/list");
+
+const string Protocol::APP_OPCODE("qmf.opcode");
+
+const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request");
+const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response");
+const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication");
+const string Protocol::OP_QUERY_REQUEST("_query_request");
+const string Protocol::OP_QUERY_RESPONSE("_query_response");
+const string Protocol::OP_SUBSCRIBE_REQUEST("_subscribe_request");
+const string Protocol::OP_SUBSCRIBE_RESPONSE("_subscribe_response");
+const string Protocol::OP_SUBSCRIBE_CANCEL_INDICATION("_subscribe_cancel_indication");
+const string Protocol::OP_SUBSCRIBE_REFRESH_REQUEST("_subscribe_refresh_request");
+const string Protocol::OP_DATA_INDICATION("_data_indication");
+const string Protocol::OP_METHOD_REQUEST("_method_request");
+const string Protocol::OP_METHOD_RESPONSE("_method_response");
+
+const string Protocol::CONTENT_PACKAGE("_schema_package");
+const string Protocol::CONTENT_SCHEMA_ID("_schema_id");
+const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class");
+const string Protocol::CONTENT_OBJECT_ID("_object_id");
+const string Protocol::CONTENT_DATA("_data");
+const string Protocol::CONTENT_EVENT("_event");
#if 0
bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/)
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 22d53e93b7..c0c8b69bc8 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -31,6 +31,7 @@
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Message.h>
+#include <qpid/messaging/MapView.h>
#include <string>
#include <deque>
#include <map>
@@ -83,6 +84,8 @@ namespace engine {
AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore);
~AgentImpl();
+ void setNotifyCallback(Agent::notifyCb handler);
+ void setNotifyCallback(Notifiable* handler);
void setAttr(const char* key, const Variant& value);
void setStoreDir(const char* path);
void setTransferDir(const char* path);
@@ -107,10 +110,12 @@ namespace engine {
const string name;
const string domain;
string directAddr;
- map<string, Variant> attrMap;
+ Variant::Map attrMap;
string storeDir;
string transferDir;
bool internalStore;
+ Agent::notifyCb notifyHandler;
+ Notifiable* notifiable;
Uuid systemId;
uint16_t bootSequence;
uint32_t nextContextNum;
@@ -156,14 +161,20 @@ namespace engine {
AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
const string& key, boost::shared_ptr<Variant::Map> argMap,
const SchemaClass* cls);
- void handleRcvMessageLH(qpid::messaging::Message& message);
+ void notify();
+ void handleRcvMessageLH(const Message& message);
+ void handleAgentLocateLH(const Message& message);
+ void handleQueryRequestLH(const Message& message);
+ void handleSubscribeRequest(const Message& message);
+ void handleSubscribeCancel(const Message& message);
+ void handleSubscribeRefresh(const Message& message);
+ void handleMethodRequest(const Message& message);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
uint32_t code = 0, const string& text = "OK");
void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Message& msg);
void handlePackageRequest(Message& msg);
void handleClassQuery(Message& msg);
void handleSchemaRequest(Message& msg, uint32_t sequence,
@@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy()
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
+ notifyHandler(0), notifiable(0),
bootSequence(1), nextContextNum(1), running(true), thread(0)
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
if (_d == 0) {
directAddr += " { create:always }";
}
+ attrMap["vendor"] = vendor;
+ attrMap["product"] = product;
+ attrMap["name"] = name;
}
@@ -211,9 +226,22 @@ AgentImpl::~AgentImpl()
{
}
+void AgentImpl::setNotifyCallback(Agent::notifyCb handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifyHandler = handler;
+}
+
+void AgentImpl::setNotifyCallback(Notifiable* handler)
+{
+ Mutex::ScopedLock _lock(lock);
+ notifiable = handler;
+}
+
void AgentImpl::setAttr(const char* key, const Variant& value)
{
- attrMap.insert(pair<string, Variant>(key, value));
+ Mutex::ScopedLock _lock(lock);
+ attrMap[key] = value;
}
void AgentImpl::setStoreDir(const char* path)
@@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path)
transferDir.clear();
}
-/*
-void AgentImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
- string replyToExchange(message.replyExchange ? message.replyExchange : "");
- string replyToKey(message.replyKey ? message.replyKey : "");
- string userId(message.userId ? message.userId : "");
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
- else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
- else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
- else {
- QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
- break;
- }
- }
-}
-*/
bool AgentImpl::getEvent(AgentEvent& event) const
{
@@ -277,9 +282,18 @@ void AgentImpl::popEvent()
void AgentImpl::setConnection(Connection& conn)
{
Mutex::ScopedLock _lock(lock);
+
+ //
+ // Don't permit the overwriting of an existing connection
+ // TODO: return an error or throw an exception if an overwrite is attempted.
+ //
if (connection == 0)
return;
connection = conn;
+
+ //
+ // Start the Agent thread now that we have a connection to work with.
+ //
thread = new qpid::sys::Thread(*this);
}
@@ -384,6 +398,61 @@ void AgentImpl::stop()
running = false;
}
+void AgentImpl::handleRcvMessageLH(const Message& message)
+{
+ Variant::Map headers(message.getHeaders());
+ cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl;
+
+ if (message.getContentType() != Protocol::AMQP_CONTENT_MAP)
+ return;
+
+ Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE);
+ if (iter == headers.end())
+ return;
+ string opcode = iter->second.asString();
+
+ if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message);
+ if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message);
+ if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message);
+ if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message);
+}
+
+void AgentImpl::handleAgentLocateLH(const Message& message)
+{
+ const MapView predicate(message);
+
+ //if (predicateMatches(predicate, attrMap)) {
+ // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap);
+ //}
+}
+
+void AgentImpl::handleQueryRequestLH(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeCancel(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleSubscribeRefresh(const Message& message)
+{
+ const MapView map(message);
+}
+
+void AgentImpl::handleMethodRequest(const Message& message)
+{
+ const MapView map(message);
+}
+
AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key)
{
AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
@@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c
return event;
}
-void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/)
+void AgentImpl::notify()
{
+ if (notifyHandler != 0)
+ notifyHandler();
+ if (notifiable != 0)
+ notifiable->notify();
}
void AgentImpl::sendPackageIndicationLH(const string& packageName)
@@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s
Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); }
Agent::~Agent() { delete impl; }
+void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); }
+void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); }
void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); }
void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
index 46ed653576..f76c69b446 100644
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
@@ -19,7 +19,7 @@
#include "qmf/engine/BrokerProxyImpl.h"
#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
#include "qpid/Address.h"
#include "qpid/sys/SystemInfo.h"
#include <qpid/log/Statement.h>
@@ -30,7 +30,7 @@
using namespace std;
using namespace qmf::engine;
-using namespace qpid::framing;
+using namespace qpid::messaging;
using namespace qpid::sys;
namespace {
@@ -64,10 +64,6 @@ BrokerEvent BrokerEventImpl::copy()
::memset(&item, 0, sizeof(BrokerEvent));
item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
item.context = context;
item.queryResponse = queryResponse.get();
item.methodResponse = methodResponse.get();
@@ -87,63 +83,12 @@ BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicOb
seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
}
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
- {
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList[0] = agent;
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
- }
-
- console.impl->eventAgentAdded(agent);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
+void BrokerProxyImpl::sendBufferLH(Buffer&, const string&, const string&)
{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
+ // TODO
}
+/*
void BrokerProxyImpl::handleRcvMessage(Message& message)
{
Buffer inBuffer(message.body, message.length);
@@ -153,22 +98,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message)
while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
+*/
bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
{
@@ -227,10 +157,6 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr
bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
{
- if (query.impl->singleAgent()) {
- if (query.impl->agentBank() != agent->getAgentBank())
- return false;
- }
stringstream key;
Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
uint32_t sequence(seqMgr.reserve(queryContext));
@@ -269,7 +195,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const
return string();
}
-void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
+void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaClass* cls,
const string& methodName, const Value* args, void* userContext)
{
int methodCount = cls->getMethodCount();
@@ -452,43 +378,13 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq
void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
{
- SchemaObjectClass* oClassPtr;
- SchemaEventClass* eClassPtr;
+ SchemaClass* classPtr;
uint8_t kind = inBuffer.getOctet();
const SchemaClassKey* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
- console.impl->learnClass(oClassPtr);
- key = oClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr = SchemaEventClassImpl::factory(inBuffer);
- console.impl->learnClass(eClassPtr);
- key = eClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
+ classPtr = SchemaClassImpl::factory(inBuffer);
+ console.impl->learnClass(classPtr);
+ key = classPtr->getClassKey();
+ QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
}
ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
@@ -496,7 +392,7 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq
auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
- SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
+ SchemaClass* schema = console.impl->getSchema(classKey.get());
if (schema == 0) {
QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
return ObjectPtr();
@@ -749,12 +645,6 @@ uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
void BrokerProxy::popEvent() { impl->popEvent(); }
uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
index 031eb698e0..ab45982dfe 100644
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
@@ -25,8 +25,6 @@
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Uuid.h"
#include "qpid/messaging/Variant.h"
#include "qpid/sys/Mutex.h"
#include "boost/shared_ptr.hpp"
@@ -80,9 +78,6 @@ namespace engine {
struct BrokerEventImpl {
typedef boost::shared_ptr<BrokerEventImpl> Ptr;
BrokerEvent::EventKind kind;
- std::string name;
- std::string exchange;
- std::string bindingKey;
void* context;
QueryResponsePtr queryResponse;
MethodResponsePtr methodResponse;
@@ -123,14 +118,7 @@ namespace engine {
BrokerProxyImpl(BrokerProxy& pub, Console& _console);
~BrokerProxyImpl() {}
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
bool getEvent(BrokerEvent& event) const;
void popEvent();
@@ -140,7 +128,7 @@ namespace engine {
void sendQuery(const Query& query, void* context, const AgentProxy* agent);
bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer);
- void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
+ void sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context);
void addBinding(const std::string& exchange, const std::string& key);
void staticRelease() { decOutstanding(); }
@@ -153,12 +141,11 @@ namespace engine {
mutable qpid::sys::Mutex lock;
Console& console;
std::string queueName;
- qpid::framing::Uuid brokerId;
+ qpid::messaging::Uuid brokerId;
SequenceManager seqMgr;
uint32_t requestsOutstanding;
bool topicBound;
std::map<uint32_t, AgentProxyPtr> agentList;
- std::deque<MessageImpl::Ptr> xmtQueue;
std::deque<BrokerEventImpl::Ptr> eventQueue;
# define MA_BUFFER_SIZE 65536
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
index c2d1f51f2b..75cfbed822 100644
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
@@ -18,20 +18,6 @@
*/
#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
#include <qpid/log/Statement.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
index d459e128f9..30b565058b 100644
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h
+++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
@@ -21,15 +21,13 @@
*/
#include "qmf/engine/Console.h"
-#include "qmf/engine/MessageImpl.h"
#include "qmf/engine/SchemaImpl.h"
#include "qmf/engine/ObjectImpl.h"
#include "qmf/engine/ObjectIdImpl.h"
#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/Protocol.h"
+#include "qmf/Protocol.h"
#include "qmf/engine/SequenceManager.h"
#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Uuid.h>
#include <qpid/sys/Mutex.h>
#include <qpid/sys/Time.h>
#include <qpid/sys/SystemInfo.h>
diff --git a/qpid/cpp/src/qpid/messaging/ListContent.cpp b/qpid/cpp/src/qpid/messaging/ListContent.cpp
index 0c3ca5fc62..038c1fad0b 100644
--- a/qpid/cpp/src/qpid/messaging/ListContent.cpp
+++ b/qpid/cpp/src/qpid/messaging/ListContent.cpp
@@ -37,6 +37,11 @@ class ListContentImpl : public Variant
}
}
+ ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m)
+ {
+ msg->getContent().clear();
+ }
+
void encode()
{
qpid::client::amqp0_10::ListCodec codec;
@@ -45,6 +50,7 @@ class ListContentImpl : public Variant
};
ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {}
+ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {}
ListContent::~ListContent() { delete impl; }
ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; }
diff --git a/qpid/cpp/src/qpid/messaging/MapContent.cpp b/qpid/cpp/src/qpid/messaging/MapContent.cpp
index 6dba22be99..1f190b85aa 100644
--- a/qpid/cpp/src/qpid/messaging/MapContent.cpp
+++ b/qpid/cpp/src/qpid/messaging/MapContent.cpp
@@ -37,6 +37,11 @@ class MapContentImpl : public Variant
}
}
+ MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m)
+ {
+ msg->getContent().clear();
+ }
+
void encode()
{
qpid::client::amqp0_10::MapCodec codec;
@@ -46,6 +51,7 @@ class MapContentImpl : public Variant
};
MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {}
+MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {}
MapContent::~MapContent() { delete impl; }
MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; }
diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp
index 9c6f066d64..98b92e809c 100644
--- a/qpid/cpp/src/tests/MessagingSessionTests.cpp
+++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp
@@ -346,6 +346,25 @@ QPID_AUTO_TEST_CASE(testMapMessage)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testMapMessageWithInitial)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ Variant::Map imap;
+ imap["abc"] = "def";
+ imap["pi"] = 3.14f;
+ MapContent content(out, imap);
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ MapView view(in);
+ BOOST_CHECK_EQUAL(view["abc"].asString(), "def");
+ BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f);
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_CASE(testListMessage)
{
QueueFixture fix;
@@ -379,6 +398,40 @@ QPID_AUTO_TEST_CASE(testListMessage)
fix.session.acknowledge();
}
+QPID_AUTO_TEST_CASE(testListMessageWithInitial)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out;
+ Variant::List ilist;
+ ilist.push_back(Variant("abc"));
+ ilist.push_back(Variant(1234));
+ ilist.push_back(Variant("def"));
+ ilist.push_back(Variant(56.789));
+ ListContent content(out, ilist);
+ content.encode();
+ sender.send(out);
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(5 * qpid::sys::TIME_SEC);
+ ListView view(in);
+ BOOST_CHECK_EQUAL(view.size(), content.size());
+ BOOST_CHECK_EQUAL(view.front().asString(), "abc");
+ BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789);
+
+ ListView::const_iterator i = view.begin();
+ BOOST_CHECK(i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "abc");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asInt64(), 1234);
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asString(), "def");
+ BOOST_CHECK(++i != view.end());
+ BOOST_CHECK_EQUAL(i->asDouble(), 56.789);
+ BOOST_CHECK(++i == view.end());
+
+ fix.session.acknowledge();
+}
+
QPID_AUTO_TEST_CASE(testReject)
{
QueueFixture fix;