diff options
author | Ted Ross <tross@apache.org> | 2010-02-26 23:11:19 +0000 |
---|---|---|
committer | Ted Ross <tross@apache.org> | 2010-02-26 23:11:19 +0000 |
commit | 3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b (patch) | |
tree | 16f81a8ee4c6ff28ea4c3fd6e2aaf9f9a24e71ef | |
parent | c1611f64a7c9dce39c19794dd3d887e3f1815b29 (diff) | |
download | qpid-python-3296ad1ca8f77bf82fe9fd059c5e44580a4f2f4b.tar.gz |
Checkpointing Agent engine code.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qmf-devel0.7@916854 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/Agent.h | 135 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/Notifiable.h | 48 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/Protocol.h | 37 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/engine/Agent.h | 19 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/engine/Console.h | 16 | ||||
-rw-r--r-- | qpid/cpp/include/qmf/engine/Query.h | 1 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/messaging/ListContent.h | 1 | ||||
-rw-r--r-- | qpid/cpp/include/qpid/messaging/MapContent.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qmf.mk | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Agent.cpp | 145 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/Protocol.cpp | 24 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp | 131 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp | 136 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/BrokerProxyImpl.h | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.cpp | 14 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/engine/ConsoleImpl.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/ListContent.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/messaging/MapContent.cpp | 6 | ||||
-rw-r--r-- | qpid/cpp/src/tests/MessagingSessionTests.cpp | 53 |
20 files changed, 550 insertions, 252 deletions
diff --git a/qpid/cpp/Makefile.am b/qpid/cpp/Makefile.am index dcbc4c10c3..7807492627 100644 --- a/qpid/cpp/Makefile.am +++ b/qpid/cpp/Makefile.am @@ -27,7 +27,7 @@ EXTRA_DIST = \ xml/cluster.xml INSTALL-WINDOWS CMakeLists.txt BuildInstallSettings.cmake \ packaging/NSIS -SUBDIRS = managementgen etc src docs/api docs/man examples bindings/qmf +SUBDIRS = managementgen etc src docs/api docs/man examples # Update libtool, if needed. libtool: $(LIBTOOL_DEPS) diff --git a/qpid/cpp/include/qmf/Agent.h b/qpid/cpp/include/qmf/Agent.h index e61cd737d0..d982a05cf5 100644 --- a/qpid/cpp/include/qmf/Agent.h +++ b/qpid/cpp/include/qmf/Agent.h @@ -21,25 +21,29 @@ */ #include "qmf/QmfImportExport.h" +#include "qmf/Notifiable.h" +#include "qpid/sys/Time.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Variant.h" +#include <string> namespace qmf { class AgentImpl; - class Connection; class ObjectId; class AgentObject; - class Value; class Event; - class SchemaObjectClass; + class SchemaClass; + class Query; /** - * AgentListener is used by agents that select the internalStore=false option (see Agent + * AgentHandler is used by agents that select the internalStore=false option (see Agent * constructor) or by agents that wish to provide access control for queries and methods. * * \ingroup qmfapi */ - class AgentListener { - QMF_EXTERN virtual ~AgentListener(); + class AgentHandler { + QMF_EXTERN virtual ~AgentHandler(); /** * allowQuery is called before a query operation is executed. If true is returned @@ -49,7 +53,7 @@ namespace qmf { * @param q The query being requested. * @param userId The authenticated identity of the user requesting the query. */ - virtual bool allowQuery(const Query& q, const char* userId); + virtual bool allowQuery(const Query& q, const std::string& userId); /** * allowMethod is called before a method call is executed. If true is returned @@ -62,8 +66,11 @@ namespace qmf { * @param cls The Schema describing the object being called. * @param userId The authenticated identity of the requesting user. */ - virtual bool allowMethod(const char* name, const Value& args, const ObjectId& oid, - const SchemaObjectClass& cls, const char* userId); + virtual bool allowMethod(const std::string& name, + const qpid::messaging::Variant::Map& args, + const ObjectId& oid, + const SchemaClass& cls, + const std::string& userId); /** * query is called when the agent receives a query request. The handler must invoke @@ -78,11 +85,11 @@ namespace qmf { * @param q The query requested by the console. * @param userId the authenticated identity of the user requesting the query. */ - virtual void query(uint32_t context, const Query& q, const char* userId); + virtual void query(uint32_t context, const Query& q, const std::string& userId); /** * syncStart is called when a console requests a standing query. This function must - * behave exactly like AgentListener::query (i.e. send zero or more responses followed + * behave exactly like AgentHandler::query (i.e. send zero or more responses followed * by a queryComplete) except it then remembers the context and the query and makes * subsequent queryResponse calls whenever appropriate according the the query. * @@ -96,7 +103,7 @@ namespace qmf { * @param q The query requested by the console. * @param userId the authenticated identity of the user requesting the query. */ - virtual void syncStart(uint32_t context, const Query& q, const char* userId); + virtual void syncStart(uint32_t context, const Query& q, const std::string& userId); /** * syncTouch is called when the console that requested a standing query refreshes its @@ -109,7 +116,7 @@ namespace qmf { * @param context The context supplied in a previous call to syncStart. * @param userId The authenticated identity of the requesting user. */ - virtual void syncTouch(uint32_t context, const char* userId); + virtual void syncTouch(uint32_t context, const std::string& userId); /** * syncStop is called when the console that requested a standing query no longer wishes to @@ -121,7 +128,7 @@ namespace qmf { * @param context The context supplied in a previous call to syncStart. * @param userId The authenticated identity of the requesting user. */ - virtual void syncStop(uint32_t context, const char* userId); + virtual void syncStop(uint32_t context, const std::string& userId); /** * methodCall is called when a console invokes a method on a QMF object. The application @@ -138,8 +145,8 @@ namespace qmf { * @param cls The Schema describing the object being called. * @param userId The authenticated identity of the requesting user. */ - virtual void methodCall(uint32_t context, const char* name, Value& args, - const ObjectId& oid, const SchemaObjectClass& cls, const char* userId); + virtual void methodCall(uint32_t context, const std::string& name, qpid::messaging::Variant::Map& args, + const ObjectId& oid, const SchemaClass& cls, const std::string& userId); }; /** @@ -153,7 +160,16 @@ namespace qmf { /** * Create an instance of the Agent class. * - * @param label An optional string label that can be used to identify the agent. + * @param vendor A string identifying the vendor of the agent application. + * This should follow the reverse-domain-name form (i.e. org.apache). + * + * @param product A string identifying the product provided by the vendor. + * + * @param instance A string that uniquely identifies this instance of the agent. + * If zero, the agent will generate a guid for the instance string. + * + * @param domain A string that defines the QMF domain that this agent should join. + * If zero, the agent will join the default QMF domain. * * @param internalStore If true, objects shall be tracked internally by the agent. * If false, the user of the agent must track the objects. @@ -162,13 +178,19 @@ namespace qmf { * individual operations. If the user is tracking the objects, the user code * must implement queries and syncs (standing queries). * - * @param listener A pointer to a class that implements the AgentListener interface. + * @param handler A pointer to a class that implements the AgentHandler interface. * This must be supplied if any of the following conditions are true: * - The agent model contains methods * - The user wishes to individually authorize query and sync operations. * - internalStore = false + * + * @param notifiable A pointer to a class that implements the Notifiable interface. + * This argument is optional (may be supplied as 0). If it is not supplied, + * notification callbacks will not be invoked. */ - QMF_EXTERN Agent(char* label="qmfa", bool internalStore=true, AgentListener* listener=0); + QMF_EXTERN Agent(const std::string& vendor, const std::string& product, const std::string& instance="", + const std::string& domain="", bool internalStore=true, + AgentHandler* handler=0, Notifiable* notifiable=0); /** * Destroy an instance of the Agent class. @@ -176,20 +198,30 @@ namespace qmf { QMF_EXTERN ~Agent(); /** + * Set an attribute for the agent. Attributes are visible to consoles and can be used to find + * agents. + * + * @param name Name of the attribute to be set (or overwritten) + * + * @param value Value (of any variant type) of the attribute + */ + QMF_EXTERN void setAttribute(const std::string& name, const qpid::messaging::Variant& value); + + /** * Set the persistent store file. This file, if specified, is used to store state information * about the Agent. For example, if object-ids must be persistent across restarts of the Agent * program, this file path must be supplied. * * @param path Full path to a file that is both writable and readable by the Agent program. */ - QMF_EXTERN void setStoreDir(const char* path); + QMF_EXTERN void setStoreDir(const std::string& path); /** * Provide a connection (to a Qpid broker) over which the agent can communicate. * * @param conn Pointer to a Connection object. */ - QMF_EXTERN void setConnection(Connection* conn); + QMF_EXTERN void setConnection(qpid::messaging::Connection& conn); /** * Register a class schema (object or event) with the agent. The agent must have a registered @@ -198,30 +230,29 @@ namespace qmf { * * @param cls Pointer to the schema structure describing the class. */ - QMF_EXTERN void registerClass(SchemaObjectClass* cls); - QMF_EXTERN void registerClass(SchemaEventClass* cls); + QMF_EXTERN void registerClass(SchemaClass* cls); /** - * Add an object to the agent (for internal storage mode only). - * - * @param obj Reference to the object to be managed by the agent. + * Invoke the handler (if supplied in the constructor) with events stored in the Agent's work + * queue. This function call is a way of supplying the Agent with a thread on which to run the + * application's handler (the Agent will never invoke the handler on one of its internal threads). * - * @param persistent Iff true, the object ID assigned to the object shall indicate persistence - * (i.e. the object ID shall be the same across restarts of the agent program). - * - * @param oid 64-bit value for the oid (if zero, the agent will assign the value). + * @param limit The maximum number of handler callbacks to invoke during this call. Zero means + * there will be no limit on the number of invocations. * - * @param oidLo 32-bit value for the lower 32-bits of the oid. + * @param timeout The time this call will block if there are no handler events to process. * - * @param oidHi 32-bit value for the upper 32-bits of the oid. + * @return The number of handler events processed. If the timeout expired, the return value will + * be zero. */ - QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0); - QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); + QMF_EXTERN uint32_t invokeHandler(uint32_t limit=0, qpid::sys::Duration timeout=qpid::sys::TIME_INFINITE); /** - * Allocate an object ID for an object (for external storage mode only). + * Add an object to the agent (for internal storage mode only). + * + * @param obj Reference to the object to be managed by the agent. * - * @param persistent Iff true, the object ID allocated shall indicate persistence + * @param persistent Iff true, the object ID assigned to the object shall indicate persistence * (i.e. the object ID shall be the same across restarts of the agent program). * * @param oid 64-bit value for the oid (if zero, the agent will assign the value). @@ -230,8 +261,8 @@ namespace qmf { * * @param oidHi 32-bit value for the upper 32-bits of the oid. */ - QMF_EXTERN const ObjectId* allocObjectId(bool persistent=false, uint64_t oid=0); - QMF_EXTERN const ObjectId* allocObjectId(bool persistent, uint32_t oidLo, uint32_t oidHi); + QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent=false, uint64_t oid=0); + QMF_EXTERN const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); /** * Raise a QMF event. @@ -243,42 +274,44 @@ namespace qmf { /** * Provide a response to a query (for external storage mode only). * - * @param context The context value supplied in the query (via the AgentListener interface). + * @param context The context value supplied in the query (via the AgentHandler interface). * * @param object A reference to the agent that matched the query criteria. - * - * @param prop If true, transmit the property attributes of this object. - * - * @param stat If true, transmit the statistic attributes of this object. */ - QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object, bool prop = true, bool stat = true); + QMF_EXTERN void queryResponse(uint32_t context, AgentObject& object); /** * Indicate that a query (or the initial dump of a sync) is complete (for external storage mode only). * - * @param context The context value supplied in the query/sync (via the AgentListener interface). + * @param context The context value supplied in the query/sync (via the AgentHandler interface). */ QMF_EXTERN void queryComplete(uint32_t context); /** * Provide the response to a method call. * - * @param context The context value supplied in the method request (via the AgentListener interface). + * @param context The context value supplied in the method request (via the AgentHandler interface). * * @param args The argument list from the method call. Must include the output arguments (may include * the input arguments). * - * @param status Numerical return status: zero indicates no error, non-zero indicates error. - * - * @param exception Pointer to an exception value. If status is non-zero, the exception value is + * @param exception Pointer to an exception value. If status is non-null, the exception value is * sent to the caller. It is optional (i.e. leave the pointer as 0), or may be * set to any legal value. A string may be supplied, but an unmanaged object of * any schema may also be passed. */ - QMF_EXTERN void methodResponse(uint32_t context, const Value& args, uint32_t status=0, - const Value* exception=0); + QMF_EXTERN void methodResponse(uint32_t context, + const qpid::messaging::Variant::Map& args, + const qpid::messaging::Variant& exception=qpid::messaging::Variant()); private: + /** + * Private copy constructor and assignment operator ensure that objects of this class cannot + * be copied. + */ + Agent(const Agent&); + const Agent& operator=(const Agent&); + AgentImpl* impl; }; diff --git a/qpid/cpp/include/qmf/Notifiable.h b/qpid/cpp/include/qmf/Notifiable.h new file mode 100644 index 0000000000..43f546d9cd --- /dev/null +++ b/qpid/cpp/include/qmf/Notifiable.h @@ -0,0 +1,48 @@ +#ifndef _QmfNotifiable_ +#define _QmfNotifiable_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/QmfImportExport.h" + +namespace qmf { + + /** + * The Notifiable class is an interface that may be used in the application. It provides + * a single callback (notify) that is invoked when the agent or console has events to be + * handled by the application. + * + * This interface should only be used in an application that has more event drivers than + * just a single QMF interface. For example, an application that already uses select or + * poll to control execution can use the notify callback to awaken the select/poll call. + * + * No QMF operations should be performed from the notify callback. It should only be used + * to awaken an application thread that will then perform QMF operations. + * + * \ingroup qmfapi + */ + class Notifiable { + public: + QMF_EXTERN virtual ~Notifiable(); + virtual void notify() = 0; + }; +} + +#endif diff --git a/qpid/cpp/include/qmf/Protocol.h b/qpid/cpp/include/qmf/Protocol.h index c45ce13434..361a7bd283 100644 --- a/qpid/cpp/include/qmf/Protocol.h +++ b/qpid/cpp/include/qmf/Protocol.h @@ -57,6 +57,43 @@ namespace qmf { const static std::string SUBTYPE_SCHEMA_PROPERTY; const static std::string SUBTYPE_SCHEMA_METHOD; + /** + * Message Content Types + */ + const static std::string AMQP_CONTENT_MAP; + const static std::string AMQP_CONTENT_LIST; + + /** + * Application Header Keys + */ + const static std::string APP_OPCODE; + + /** + * QMF Op Codes + */ + const static std::string OP_AGENT_LOCATE_REQUEST; + const static std::string OP_AGENT_LOCATE_RESPONSE; + const static std::string OP_AGENT_HEARTBEAT_INDICATION; + const static std::string OP_QUERY_REQUEST; + const static std::string OP_QUERY_RESPONSE; + const static std::string OP_SUBSCRIBE_REQUEST; + const static std::string OP_SUBSCRIBE_RESPONSE; + const static std::string OP_SUBSCRIBE_CANCEL_INDICATION; + const static std::string OP_SUBSCRIBE_REFRESH_REQUEST; + const static std::string OP_DATA_INDICATION; + const static std::string OP_METHOD_REQUEST; + const static std::string OP_METHOD_RESPONSE; + + /** + * Content type definitions + */ + const static std::string CONTENT_PACKAGE; + const static std::string CONTENT_SCHEMA_ID; + const static std::string CONTENT_SCHEMA_CLASS; + const static std::string CONTENT_OBJECT_ID; + const static std::string CONTENT_DATA; + const static std::string CONTENT_EVENT; + /* const static uint8_t OP_ATTACH_REQUEST = 'A'; const static uint8_t OP_ATTACH_RESPONSE = 'a'; diff --git a/qpid/cpp/include/qmf/engine/Agent.h b/qpid/cpp/include/qmf/engine/Agent.h index 268e53a9a5..4585ce51cc 100644 --- a/qpid/cpp/include/qmf/engine/Agent.h +++ b/qpid/cpp/include/qmf/engine/Agent.h @@ -20,6 +20,7 @@ * under the License. */ +#include <qmf/Notifiable.h> #include <qmf/engine/Schema.h> #include <qmf/engine/ObjectId.h> #include <qmf/engine/Object.h> @@ -65,10 +66,28 @@ namespace engine { */ class Agent { public: + /** + * Declare a type for a notification callback. + */ + typedef void (*notifyCb)(); + Agent(const char* vendor, const char* product, const char* name, const char* domain=0, bool internalStore=true); ~Agent(); /** + * Provide the Agent with a notification callback that is invoked whenever there is new work + * placed on the event queue. + * + * There are two flavors of notification callback: C-style based on the + * type "notifyCb"; and the C++ style based on the Notifiable class. + * The C++ style can be used for C++ wrappers/applications and the + * C-style can be used for C wrappers/applications and also for + * Swig-based script wrappers. + */ + void setNotifyCallback(notifyCb handler); + void setNotifyCallback(Notifiable* handler); + + /** * Set an agent attribute that can be used to describe this agent to consoles. *@param key Null-terminated string that is the name of the attribute. *@param value Variant value (or any API type) of the attribute. diff --git a/qpid/cpp/include/qmf/engine/Console.h b/qpid/cpp/include/qmf/engine/Console.h index 853d0362f3..2f611cecee 100644 --- a/qpid/cpp/include/qmf/engine/Console.h +++ b/qpid/cpp/include/qmf/engine/Console.h @@ -98,8 +98,6 @@ namespace engine { Event* event; // (EVENT_RECEIVED) uint64_t timestamp; // (AGENT_HEARTBEAT) QueryResponse* queryResponse; // (QUERY_COMPLETE) - bool hasProps; - bool hasStats; }; /** @@ -107,21 +105,11 @@ namespace engine { */ struct BrokerEvent { enum EventKind { - BROKER_INFO = 10, - DECLARE_QUEUE = 11, - DELETE_QUEUE = 12, - BIND = 13, - UNBIND = 14, - SETUP_COMPLETE = 15, - STABLE = 16, QUERY_COMPLETE = 17, METHOD_RESPONSE = 18 }; EventKind kind; - char* name; // ([DECLARE|DELETE]_QUEUE, [UN]BIND) - char* exchange; // ([UN]BIND) - char* bindingKey; // ([UN]BIND) void* context; // (QUERY_COMPLETE, METHOD_RESPONSE) QueryResponse* queryResponse; // (QUERY_COMPLETE) MethodResponse* methodResponse; // (METHOD_RESPONSE) @@ -155,10 +143,6 @@ namespace engine { BrokerProxy(Console& console); ~BrokerProxy(); - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - bool getEvent(BrokerEvent& event) const; void popEvent(); diff --git a/qpid/cpp/include/qmf/engine/Query.h b/qpid/cpp/include/qmf/engine/Query.h index 08e4ebf3cc..8954a08285 100644 --- a/qpid/cpp/include/qmf/engine/Query.h +++ b/qpid/cpp/include/qmf/engine/Query.h @@ -53,6 +53,7 @@ namespace engine { private: friend struct QueryImpl; + friend struct BrokerProxyImpl; QueryImpl* impl; }; } diff --git a/qpid/cpp/include/qpid/messaging/ListContent.h b/qpid/cpp/include/qpid/messaging/ListContent.h index 1c4e13716d..f57a920a88 100644 --- a/qpid/cpp/include/qpid/messaging/ListContent.h +++ b/qpid/cpp/include/qpid/messaging/ListContent.h @@ -42,6 +42,7 @@ class ListContent typedef Variant::List::const_reverse_iterator const_reverse_iterator; QPID_CLIENT_EXTERN ListContent(Message&); + QPID_CLIENT_EXTERN ListContent(Message&, const Variant::List&); QPID_CLIENT_EXTERN ~ListContent(); QPID_CLIENT_EXTERN const_iterator begin() const; diff --git a/qpid/cpp/include/qpid/messaging/MapContent.h b/qpid/cpp/include/qpid/messaging/MapContent.h index b05cb31295..3a80a38732 100644 --- a/qpid/cpp/include/qpid/messaging/MapContent.h +++ b/qpid/cpp/include/qpid/messaging/MapContent.h @@ -47,6 +47,7 @@ class MapContent typedef std::map<key_type, Variant>::reverse_iterator reverse_iterator; QPID_CLIENT_EXTERN MapContent(Message&); + QPID_CLIENT_EXTERN MapContent(Message&, const Variant::Map&); QPID_CLIENT_EXTERN ~MapContent(); QPID_CLIENT_EXTERN const_iterator begin() const; diff --git a/qpid/cpp/src/qmf.mk b/qpid/cpp/src/qmf.mk index 34f93c63ed..80e4f5bc46 100644 --- a/qpid/cpp/src/qmf.mk +++ b/qpid/cpp/src/qmf.mk @@ -44,7 +44,9 @@ QMF_ENGINE_API = \ ../include/qmf/engine/Object.h \ ../include/qmf/engine/QmfEngineImportExport.h \ ../include/qmf/engine/Query.h \ - ../include/qmf/engine/Schema.h + ../include/qmf/engine/Schema.h \ + ../include/qmf/Agent.h \ + ../include/qmf/Notifiable.h # ../include/qmf/engine/ObjectId.h @@ -58,6 +60,8 @@ libqmf_la_SOURCES = \ qpid/agent/ManagementAgentImpl.cpp \ qpid/agent/ManagementAgentImpl.h +# qmf/Agent.cpp + libqmfengine_la_SOURCES = \ $(QMF_ENGINE_API) \ qmf/engine/Agent.cpp \ diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp new file mode 100644 index 0000000000..bdaf064f0a --- /dev/null +++ b/qpid/cpp/src/qmf/Agent.cpp @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/Agent.h" +#include "qmf/engine/Agent.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" + +using namespace std; +using namespace qmf; +using namespace qpid::messaging; +using qpid::sys::Duration; + +namespace qmf { + class AgentImpl { + public: + AgentImpl(const string& vendor, const string& product, const string& instance, const string& domain, bool internalStore, + AgentHandler* handler, Notifiable* notifiable); + ~AgentImpl(); + void setAttribute(const string& name, const Variant& value) { agentEngine.setAttr(name.c_str(), value); } + void setStoreDir(const string& path) { agentEngine.setStoreDir(path.c_str()); } + void setConnection(Connection& conn) { agentEngine.setConnection(conn); } + void registerClass(SchemaClass* cls) { agentEngine.registerClass(cls); } + uint32_t invokeHandler(uint32_t limit, Duration timeout); + // const ObjectId* addObject(AgentObject& obj, bool persistent, uint64_t oid); + // const ObjectId* addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); + void raiseEvent(Event& event); + void queryResponse(uint32_t context, AgentObject& object); + void queryComplete(uint32_t context); + void methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception); + private: + const string vendor; + const string product; + const string instance; + const string domain; + const bool internalStore; + AgentHandler* handler; + Notifiable* notifiable; + engine::Agent agentEngine; + qpid::sys::Mutex lock; + qpid::sys::Condition cond; + }; +} + +AgentImpl::AgentImpl(const string& _vendor, const string& _product, const string& _instance, const string& _domain, + bool _internalStore, AgentHandler* _handler, Notifiable* _notifiable) : + vendor(_vendor), product(_product), instance(_instance.empty() ? "TODO" : _instance), + domain(_domain.empty() ? "default" : _domain), internalStore(_internalStore), + handler(_handler), notifiable(_notifiable), + agentEngine(vendor.c_str(), product.c_str(), instance.c_str(), domain.c_str(), internalStore) +{ +} + +AgentImpl::~AgentImpl() +{ +} + +void AgentImpl::registerClass(SchemaClass* /*cls*/) +{ +} + +uint32_t AgentImpl::invokeHandler(uint32_t limit, Duration timeout) +{ + engine::AgentEvent event; + bool valid; + qpid::sys::AbsTime endTime(qpid::sys::now(), timeout); + + { + qpid::sys::Mutex::ScopedLock l(lock); + valid = agentEngine.getEvent(event); + while (!valid) { + if (!cond.wait(lock, endTime)) + return 0; + valid = agentEngine.getEvent(event); + } + } + + uint32_t count = 0; + while (valid) { + // TODO: Process event + count++; + if (limit > 0 && count == limit) + break; + agentEngine.popEvent(); + valid = agentEngine.getEvent(event); + } + + return count; +} + +// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint64_t oid); +// const ObjectId* AgentImpl::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); + +void AgentImpl::raiseEvent(Event& /*event*/) +{ +} + +void AgentImpl::queryResponse(uint32_t /*context*/, AgentObject& /*object*/) +{ +} + +void AgentImpl::queryComplete(uint32_t /*context*/) +{ +} + +void AgentImpl::methodResponse(uint32_t /*context*/, const Variant::Map& /*args*/, const Variant& /*exception*/) +{ +} + + +//================================================================== +// Wrappers +//================================================================== +Agent::Agent(const string& vendor, const string& product, const string& instance, const string& domain, + bool internalStore, AgentHandler* handler, Notifiable* notifiable) { + impl = new AgentImpl(vendor, product, instance, domain, internalStore, handler, notifiable); } +Agent::~Agent() { delete impl; } +void Agent::setAttribute(const string& name, const qpid::messaging::Variant& value) { impl->setAttribute(name, value); } +void Agent::setStoreDir(const string& path) { impl->setStoreDir(path); } +void Agent::setConnection(qpid::messaging::Connection& conn) { impl->setConnection(conn); } +void Agent::registerClass(SchemaClass* cls) { impl->registerClass(cls); } +uint32_t Agent::invokeHandler(uint32_t limit, qpid::sys::Duration timeout) { return impl->invokeHandler(limit, timeout); } +//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint64_t oid); +//const ObjectId* Agent::addObject(AgentObject& obj, bool persistent, uint32_t oidLo, uint32_t oidHi); +void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } +void Agent::queryResponse(uint32_t context, AgentObject& object) { impl->queryResponse(context, object); } +void Agent::queryComplete(uint32_t context) { impl->queryComplete(context); } +void Agent::methodResponse(uint32_t context, const Variant::Map& args, const Variant& exception) { impl->methodResponse(context, args, exception); } + diff --git a/qpid/cpp/src/qmf/Protocol.cpp b/qpid/cpp/src/qmf/Protocol.cpp index faaa4c567d..518d263080 100644 --- a/qpid/cpp/src/qmf/Protocol.cpp +++ b/qpid/cpp/src/qmf/Protocol.cpp @@ -43,6 +43,30 @@ const string Protocol::SUBTYPES("_subtypes"); const string Protocol::SUBTYPE_SCHEMA_PROPERTY("qmfProperty"); const string Protocol::SUBTYPE_SCHEMA_METHOD("qmfMethod"); +const string Protocol::AMQP_CONTENT_MAP("amqp/map"); +const string Protocol::AMQP_CONTENT_LIST("amqp/list"); + +const string Protocol::APP_OPCODE("qmf.opcode"); + +const string Protocol::OP_AGENT_LOCATE_REQUEST("_agent_locate_request"); +const string Protocol::OP_AGENT_LOCATE_RESPONSE("_agent_locate_response"); +const string Protocol::OP_AGENT_HEARTBEAT_INDICATION("_agent_heartbeat_indication"); +const string Protocol::OP_QUERY_REQUEST("_query_request"); +const string Protocol::OP_QUERY_RESPONSE("_query_response"); +const string Protocol::OP_SUBSCRIBE_REQUEST("_subscribe_request"); +const string Protocol::OP_SUBSCRIBE_RESPONSE("_subscribe_response"); +const string Protocol::OP_SUBSCRIBE_CANCEL_INDICATION("_subscribe_cancel_indication"); +const string Protocol::OP_SUBSCRIBE_REFRESH_REQUEST("_subscribe_refresh_request"); +const string Protocol::OP_DATA_INDICATION("_data_indication"); +const string Protocol::OP_METHOD_REQUEST("_method_request"); +const string Protocol::OP_METHOD_RESPONSE("_method_response"); + +const string Protocol::CONTENT_PACKAGE("_schema_package"); +const string Protocol::CONTENT_SCHEMA_ID("_schema_id"); +const string Protocol::CONTENT_SCHEMA_CLASS("_schema_class"); +const string Protocol::CONTENT_OBJECT_ID("_object_id"); +const string Protocol::CONTENT_DATA("_data"); +const string Protocol::CONTENT_EVENT("_event"); #if 0 bool Protocol::checkHeader(const Message& /*msg*/, string& /*opcode*/, uint32_t* /*seq*/) diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 22d53e93b7..c0c8b69bc8 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -31,6 +31,7 @@ #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Message.h> +#include <qpid/messaging/MapView.h> #include <string> #include <deque> #include <map> @@ -83,6 +84,8 @@ namespace engine { AgentImpl(const char* vendor, const char* product, const char* name, const char* domain, bool internalStore); ~AgentImpl(); + void setNotifyCallback(Agent::notifyCb handler); + void setNotifyCallback(Notifiable* handler); void setAttr(const char* key, const Variant& value); void setStoreDir(const char* path); void setTransferDir(const char* path); @@ -107,10 +110,12 @@ namespace engine { const string name; const string domain; string directAddr; - map<string, Variant> attrMap; + Variant::Map attrMap; string storeDir; string transferDir; bool internalStore; + Agent::notifyCb notifyHandler; + Notifiable* notifiable; Uuid systemId; uint16_t bootSequence; uint32_t nextContextNum; @@ -156,14 +161,20 @@ namespace engine { AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, const string& key, boost::shared_ptr<Variant::Map> argMap, const SchemaClass* cls); - void handleRcvMessageLH(qpid::messaging::Message& message); + void notify(); + void handleRcvMessageLH(const Message& message); + void handleAgentLocateLH(const Message& message); + void handleQueryRequestLH(const Message& message); + void handleSubscribeRequest(const Message& message); + void handleSubscribeCancel(const Message& message); + void handleSubscribeRefresh(const Message& message); + void handleMethodRequest(const Message& message); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, uint32_t code = 0, const string& text = "OK"); void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Message& msg); void handlePackageRequest(Message& msg); void handleClassQuery(Message& msg); void handleSchemaRequest(Message& msg, uint32_t sequence, @@ -198,12 +209,16 @@ AgentEvent AgentEventImpl::copy() AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), + notifyHandler(0), notifiable(0), bootSequence(1), nextContextNum(1), running(true), thread(0) { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; if (_d == 0) { directAddr += " { create:always }"; } + attrMap["vendor"] = vendor; + attrMap["product"] = product; + attrMap["name"] = name; } @@ -211,9 +226,22 @@ AgentImpl::~AgentImpl() { } +void AgentImpl::setNotifyCallback(Agent::notifyCb handler) +{ + Mutex::ScopedLock _lock(lock); + notifyHandler = handler; +} + +void AgentImpl::setNotifyCallback(Notifiable* handler) +{ + Mutex::ScopedLock _lock(lock); + notifiable = handler; +} + void AgentImpl::setAttr(const char* key, const Variant& value) { - attrMap.insert(pair<string, Variant>(key, value)); + Mutex::ScopedLock _lock(lock); + attrMap[key] = value; } void AgentImpl::setStoreDir(const char* path) @@ -234,29 +262,6 @@ void AgentImpl::setTransferDir(const char* path) transferDir.clear(); } -/* -void AgentImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - string replyToExchange(message.replyExchange ? message.replyExchange : ""); - string replyToKey(message.replyKey ? message.replyKey : ""); - string userId(message.userId ? message.userId : ""); - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); - else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); - else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); - else { - QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); - break; - } - } -} -*/ bool AgentImpl::getEvent(AgentEvent& event) const { @@ -277,9 +282,18 @@ void AgentImpl::popEvent() void AgentImpl::setConnection(Connection& conn) { Mutex::ScopedLock _lock(lock); + + // + // Don't permit the overwriting of an existing connection + // TODO: return an error or throw an exception if an overwrite is attempted. + // if (connection == 0) return; connection = conn; + + // + // Start the Agent thread now that we have a connection to work with. + // thread = new qpid::sys::Thread(*this); } @@ -384,6 +398,61 @@ void AgentImpl::stop() running = false; } +void AgentImpl::handleRcvMessageLH(const Message& message) +{ + Variant::Map headers(message.getHeaders()); + cout << "AgentImpl::handleRcvMessageLH headers=" << headers << endl; + + if (message.getContentType() != Protocol::AMQP_CONTENT_MAP) + return; + + Variant::Map::const_iterator iter = headers.find(Protocol::APP_OPCODE); + if (iter == headers.end()) + return; + string opcode = iter->second.asString(); + + if (opcode == Protocol::OP_AGENT_LOCATE_REQUEST) handleAgentLocateLH(message); + if (opcode == Protocol::OP_QUERY_REQUEST) handleQueryRequestLH(message); + if (opcode == Protocol::OP_SUBSCRIBE_REQUEST) handleSubscribeRequest(message); + if (opcode == Protocol::OP_SUBSCRIBE_CANCEL_INDICATION) handleSubscribeCancel(message); + if (opcode == Protocol::OP_SUBSCRIBE_REFRESH_REQUEST) handleSubscribeRefresh(message); + if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(message); +} + +void AgentImpl::handleAgentLocateLH(const Message& message) +{ + const MapView predicate(message); + + //if (predicateMatches(predicate, attrMap)) { + // sendResponse(message, Protocol::OP_AGENT_LOCATE_RESPONSE, attrMap); + //} +} + +void AgentImpl::handleQueryRequestLH(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRequest(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeCancel(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleSubscribeRefresh(const Message& message) +{ + const MapView map(message); +} + +void AgentImpl::handleMethodRequest(const Message& message) +{ + const MapView map(message); +} + AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string&, const string&, const string& key) { AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); @@ -407,8 +476,12 @@ AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, c return event; } -void AgentImpl::handleRcvMessageLH(qpid::messaging::Message& /*msg*/) +void AgentImpl::notify() { + if (notifyHandler != 0) + notifyHandler(); + if (notifiable != 0) + notifiable->notify(); } void AgentImpl::sendPackageIndicationLH(const string& packageName) @@ -471,6 +544,8 @@ void AgentImpl::handleMethodRequest(Message& /*msg*/, uint32_t sequence, const s Agent::Agent(const char* v, const char* p, const char* n, const char* d, bool i) { impl = new AgentImpl(v, p, n, d, i); } Agent::~Agent() { delete impl; } +void Agent::setNotifyCallback(notifyCb handler) { impl->setNotifyCallback(handler); } +void Agent::setNotifyCallback(Notifiable* handler) { impl->setNotifyCallback(handler); } void Agent::setAttr(const char* key, const Variant& value) { impl->setAttr(key, value); } void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp index 46ed653576..f76c69b446 100644 --- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp @@ -19,7 +19,7 @@ #include "qmf/engine/BrokerProxyImpl.h" #include "qmf/engine/ConsoleImpl.h" -#include "qmf/engine/Protocol.h" +#include "qmf/Protocol.h" #include "qpid/Address.h" #include "qpid/sys/SystemInfo.h" #include <qpid/log/Statement.h> @@ -30,7 +30,7 @@ using namespace std; using namespace qmf::engine; -using namespace qpid::framing; +using namespace qpid::messaging; using namespace qpid::sys; namespace { @@ -64,10 +64,6 @@ BrokerEvent BrokerEventImpl::copy() ::memset(&item, 0, sizeof(BrokerEvent)); item.kind = kind; - - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); item.context = context; item.queryResponse = queryResponse.get(); item.methodResponse = methodResponse.get(); @@ -87,63 +83,12 @@ BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicOb seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this))); } -void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/) -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); - - // TODO: Store session handle -} - -void BrokerProxyImpl::sessionClosed() -{ - Mutex::ScopedLock _lock(lock); - agentList.clear(); - eventQueue.clear(); - xmtQueue.clear(); -} - -void BrokerProxyImpl::startProtocol() -{ - AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker")); - { - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - agentList[0] = agent; - - requestsOutstanding = 1; - topicBound = false; - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence); - } - - console.impl->eventAgentAdded(agent); -} - -void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +void BrokerProxyImpl::sendBufferLH(Buffer&, const string&, const string&) { - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = DIR_EXCHANGE; - message->replyKey = queueName; - - xmtQueue.push_back(message); + // TODO } +/* void BrokerProxyImpl::handleRcvMessage(Message& message) { Buffer inBuffer(message.body, message.length); @@ -153,22 +98,7 @@ void BrokerProxyImpl::handleRcvMessage(Message& message) while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer); } - -bool BrokerProxyImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void BrokerProxyImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} +*/ bool BrokerProxyImpl::getEvent(BrokerEvent& event) const { @@ -227,10 +157,6 @@ void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentPr bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent) { - if (query.impl->singleAgent()) { - if (query.impl->agentBank() != agent->getAgentBank()) - return false; - } stringstream key; Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); uint32_t sequence(seqMgr.reserve(queryContext)); @@ -269,7 +195,7 @@ string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const return string(); } -void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, +void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const string& methodName, const Value* args, void* userContext) { int methodCount = cls->getMethodCount(); @@ -452,43 +378,13 @@ void BrokerProxyImpl::handleEventIndication(Buffer& /*inBuffer*/, uint32_t /*seq void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq) { - SchemaObjectClass* oClassPtr; - SchemaEventClass* eClassPtr; + SchemaClass* classPtr; uint8_t kind = inBuffer.getOctet(); const SchemaClassKey* key; - if (kind == CLASS_OBJECT) { - oClassPtr = SchemaObjectClassImpl::factory(inBuffer); - console.impl->learnClass(oClassPtr); - key = oClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); - - // - // If we have just learned about the org.apache.qpid.broker:agent class, send a get - // request for the current list of agents so we can have it on-hand before we declare - // this session "stable". - // - if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) { - Mutex::ScopedLock _lock(lock); - incOutstandingLH(); - Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE); - uint32_t sequence(seqMgr.reserve()); - Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence); - FieldTable ft; - ft.setString("_class", AGENT_CLASS); - ft.setString("_package", BROKER_PACKAGE); - ft.encode(outBuffer); - sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY); - QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY); - } - } else if (kind == CLASS_EVENT) { - eClassPtr = SchemaEventClassImpl::factory(inBuffer); - console.impl->learnClass(eClassPtr); - key = eClassPtr->getClassKey(); - QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str()); - } - else { - QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind); - } + classPtr = SchemaClassImpl::factory(inBuffer); + console.impl->learnClass(classPtr); + key = classPtr->getClassKey(); + QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str()); } ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat) @@ -496,7 +392,7 @@ ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer)); QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str()); - SchemaObjectClass* schema = console.impl->getSchema(classKey.get()); + SchemaClass* schema = console.impl->getSchema(classKey.get()); if (schema == 0) { QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str()); return ObjectPtr(); @@ -749,12 +645,6 @@ uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); } BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {} BrokerProxy::~BrokerProxy() { delete impl; } -void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); } -void BrokerProxy::sessionClosed() { impl->sessionClosed(); } -void BrokerProxy::startProtocol() { impl->startProtocol(); } -void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void BrokerProxy::popXmt() { impl->popXmt(); } bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); } void BrokerProxy::popEvent() { impl->popEvent(); } uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); } diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h index 031eb698e0..ab45982dfe 100644 --- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h +++ b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h @@ -25,8 +25,6 @@ #include "qmf/engine/SchemaImpl.h" #include "qmf/engine/QueryImpl.h" #include "qmf/engine/SequenceManager.h" -#include "qmf/engine/MessageImpl.h" -#include "qpid/framing/Uuid.h" #include "qpid/messaging/Variant.h" #include "qpid/sys/Mutex.h" #include "boost/shared_ptr.hpp" @@ -80,9 +78,6 @@ namespace engine { struct BrokerEventImpl { typedef boost::shared_ptr<BrokerEventImpl> Ptr; BrokerEvent::EventKind kind; - std::string name; - std::string exchange; - std::string bindingKey; void* context; QueryResponsePtr queryResponse; MethodResponsePtr methodResponse; @@ -123,14 +118,7 @@ namespace engine { BrokerProxyImpl(BrokerProxy& pub, Console& _console); ~BrokerProxyImpl() {} - void sessionOpened(SessionHandle& sh); - void sessionClosed(); - void startProtocol(); - void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); bool getEvent(BrokerEvent& event) const; void popEvent(); @@ -140,7 +128,7 @@ namespace engine { void sendQuery(const Query& query, void* context, const AgentProxy* agent); bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent); std::string encodeMethodArguments(const SchemaMethod* schema, const qpid::messaging::Variant::Map* args, qpid::framing::Buffer& buffer); - void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context); + void sendMethodRequest(ObjectId* oid, const SchemaClass* cls, const std::string& method, const qpid::messaging::Variant::Map* args, void* context); void addBinding(const std::string& exchange, const std::string& key); void staticRelease() { decOutstanding(); } @@ -153,12 +141,11 @@ namespace engine { mutable qpid::sys::Mutex lock; Console& console; std::string queueName; - qpid::framing::Uuid brokerId; + qpid::messaging::Uuid brokerId; SequenceManager seqMgr; uint32_t requestsOutstanding; bool topicBound; std::map<uint32_t, AgentProxyPtr> agentList; - std::deque<MessageImpl::Ptr> xmtQueue; std::deque<BrokerEventImpl::Ptr> eventQueue; # define MA_BUFFER_SIZE 65536 diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp index c2d1f51f2b..75cfbed822 100644 --- a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp @@ -18,20 +18,6 @@ */ #include "qmf/engine/ConsoleImpl.h" -#include "qmf/engine/MessageImpl.h" -#include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Typecode.h" -#include "qmf/engine/ObjectImpl.h" -#include "qmf/engine/ObjectIdImpl.h" -#include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ValueImpl.h" -#include "qmf/engine/Protocol.h" -#include "qmf/engine/SequenceManager.h" -#include "qmf/engine/BrokerProxyImpl.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> #include <qpid/log/Statement.h> #include <qpid/sys/Time.h> #include <qpid/sys/SystemInfo.h> diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h index d459e128f9..30b565058b 100644 --- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h +++ b/qpid/cpp/src/qmf/engine/ConsoleImpl.h @@ -21,15 +21,13 @@ */ #include "qmf/engine/Console.h" -#include "qmf/engine/MessageImpl.h" #include "qmf/engine/SchemaImpl.h" #include "qmf/engine/ObjectImpl.h" #include "qmf/engine/ObjectIdImpl.h" #include "qmf/engine/QueryImpl.h" -#include "qmf/engine/Protocol.h" +#include "qmf/Protocol.h" #include "qmf/engine/SequenceManager.h" #include "qmf/engine/BrokerProxyImpl.h" -#include <qpid/framing/Uuid.h> #include <qpid/sys/Mutex.h> #include <qpid/sys/Time.h> #include <qpid/sys/SystemInfo.h> diff --git a/qpid/cpp/src/qpid/messaging/ListContent.cpp b/qpid/cpp/src/qpid/messaging/ListContent.cpp index 0c3ca5fc62..038c1fad0b 100644 --- a/qpid/cpp/src/qpid/messaging/ListContent.cpp +++ b/qpid/cpp/src/qpid/messaging/ListContent.cpp @@ -37,6 +37,11 @@ class ListContentImpl : public Variant } } + ListContentImpl(Message& m, const Variant::List& i) : Variant(i), msg(&m) + { + msg->getContent().clear(); + } + void encode() { qpid::client::amqp0_10::ListCodec codec; @@ -45,6 +50,7 @@ class ListContentImpl : public Variant }; ListContent::ListContent(Message& m) : impl(new ListContentImpl(m)) {} +ListContent::ListContent(Message& m, const Variant::List& i) : impl(new ListContentImpl(m, i)) {} ListContent::~ListContent() { delete impl; } ListContent& ListContent::operator=(const ListContent& l) { *impl = *l.impl; return *this; } diff --git a/qpid/cpp/src/qpid/messaging/MapContent.cpp b/qpid/cpp/src/qpid/messaging/MapContent.cpp index 6dba22be99..1f190b85aa 100644 --- a/qpid/cpp/src/qpid/messaging/MapContent.cpp +++ b/qpid/cpp/src/qpid/messaging/MapContent.cpp @@ -37,6 +37,11 @@ class MapContentImpl : public Variant } } + MapContentImpl(Message& m, const Variant::Map& i) : Variant(i), msg(&m) + { + msg->getContent().clear(); + } + void encode() { qpid::client::amqp0_10::MapCodec codec; @@ -46,6 +51,7 @@ class MapContentImpl : public Variant }; MapContent::MapContent(Message& m) : impl(new MapContentImpl(m)) {} +MapContent::MapContent(Message& m, const Variant::Map& i) : impl(new MapContentImpl(m, i)) {} MapContent::~MapContent() { delete impl; } MapContent& MapContent::operator=(const MapContent& m) { *impl = *m.impl; return *this; } diff --git a/qpid/cpp/src/tests/MessagingSessionTests.cpp b/qpid/cpp/src/tests/MessagingSessionTests.cpp index 9c6f066d64..98b92e809c 100644 --- a/qpid/cpp/src/tests/MessagingSessionTests.cpp +++ b/qpid/cpp/src/tests/MessagingSessionTests.cpp @@ -346,6 +346,25 @@ QPID_AUTO_TEST_CASE(testMapMessage) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testMapMessageWithInitial) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out; + Variant::Map imap; + imap["abc"] = "def"; + imap["pi"] = 3.14f; + MapContent content(out, imap); + content.encode(); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + MapView view(in); + BOOST_CHECK_EQUAL(view["abc"].asString(), "def"); + BOOST_CHECK_EQUAL(view["pi"].asFloat(), 3.14f); + fix.session.acknowledge(); +} + QPID_AUTO_TEST_CASE(testListMessage) { QueueFixture fix; @@ -379,6 +398,40 @@ QPID_AUTO_TEST_CASE(testListMessage) fix.session.acknowledge(); } +QPID_AUTO_TEST_CASE(testListMessageWithInitial) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out; + Variant::List ilist; + ilist.push_back(Variant("abc")); + ilist.push_back(Variant(1234)); + ilist.push_back(Variant("def")); + ilist.push_back(Variant(56.789)); + ListContent content(out, ilist); + content.encode(); + sender.send(out); + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(5 * qpid::sys::TIME_SEC); + ListView view(in); + BOOST_CHECK_EQUAL(view.size(), content.size()); + BOOST_CHECK_EQUAL(view.front().asString(), "abc"); + BOOST_CHECK_EQUAL(view.back().asDouble(), 56.789); + + ListView::const_iterator i = view.begin(); + BOOST_CHECK(i != view.end()); + BOOST_CHECK_EQUAL(i->asString(), "abc"); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asInt64(), 1234); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asString(), "def"); + BOOST_CHECK(++i != view.end()); + BOOST_CHECK_EQUAL(i->asDouble(), 56.789); + BOOST_CHECK(++i == view.end()); + + fix.session.acknowledge(); +} + QPID_AUTO_TEST_CASE(testReject) { QueueFixture fix; |