diff options
Diffstat (limited to 'cpp/src/qpid/management/ManagementAgent.h')
-rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 183 |
1 files changed, 68 insertions, 115 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index c7e830dcf5..6de5d1d719 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -26,7 +26,6 @@ #include "qpid/broker/Exchange.h" #include "qpid/framing/Uuid.h" #include "qpid/sys/Mutex.h" -#include "qpid/sys/Timer.h" #include "qpid/broker/ConnectionToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" @@ -34,9 +33,11 @@ #include "qmf/org/apache/qpid/broker/Agent.h" #include "qmf/org/apache/qpid/broker/Memory.h" #include "qpid/sys/MemStat.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/ResizableBuffer.h> +#include <boost/shared_ptr.hpp> #include <memory> #include <string> #include <map> @@ -45,6 +46,9 @@ namespace qpid { namespace broker { class ConnectionState; } +namespace sys { +class Timer; +} namespace management { class ManagementAgent @@ -73,11 +77,6 @@ public: /** Called before plugins are initialized */ void configure (const std::string& dataDir, bool publish, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); - /** Called after plugins are initialized. */ - void pluginsInitialized(); - - /** Called by cluster to suppress management output during update. */ - void suppress(bool s) { suppressed = s; } void setName(const std::string& vendor, const std::string& product, @@ -100,18 +99,16 @@ public: const std::string& eventName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0, - bool persistent = false); - QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, - const std::string& key, - bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + const std::string& key, + bool persistent = false); QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); - QPID_BROKER_EXTERN void clusterUpdate(); - bool dispatchCommand (qpid::broker::Deliverable& msg, const std::string& routingKey, const framing::FieldTable* args, @@ -121,25 +118,6 @@ public: /** Disallow a method. Attempts to call it will receive an exception with message. */ void disallow(const std::string& className, const std::string& methodName, const std::string& message); - /** Disallow all QMFv1 methods (used in clustered brokers). */ - void disallowV1Methods() { disallowAllV1Methods = true; } - - /** Serialize my schemas as a binary blob into schemaOut */ - void exportSchemas(std::string& schemaOut); - - /** Serialize my remote-agent map as a binary blob into agentsOut */ - void exportAgents(std::string& agentsOut); - - /** Decode a serialized schemas and add to my schema cache */ - void importSchemas(framing::Buffer& inBuf); - - /** Decode a serialized agent map */ - void importAgents(framing::Buffer& inBuf); - - // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers - uint64_t getNextObjectId(void) { return nextObjectId; } - void setNextObjectId(uint64_t o) { nextObjectId = o; } - uint16_t getBootSequence(void) { return bootSequence; } void setBootSequence(uint16_t b) { bootSequence = b; writeData(); } @@ -148,20 +126,11 @@ public: static types::Variant::Map toMap(const framing::FieldTable& from); - // For Clustering: management objects that have been marked as - // "deleted", but are waiting for their last published object - // update are not visible to the cluster replication code. These - // interfaces allow clustering to gather up all the management - // objects that are deleted in order to allow all clustered - // brokers to publish the same set of deleted objects. - class DeletedObject { public: typedef boost::shared_ptr<DeletedObject> shared_ptr; - DeletedObject(ManagementObject *, bool v1, bool v2); - DeletedObject( const std::string &encoded ); + DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); ~DeletedObject() {}; - void encode( std::string& toBuffer ); const std::string getKey() const { // used to batch up objects of the same class type return std::string(packageName + std::string(":") + className); @@ -181,22 +150,7 @@ public: typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; - /** returns a snapshot of all currently deleted management objects. */ - void exportDeletedObjects( DeletedObjectList& outList ); - - /** Import a list of deleted objects to send on next publish interval. */ - void importDeletedObjects( const DeletedObjectList& inList ); - private: - struct Periodic : public qpid::sys::TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - // Storage for tracking remote management agents, attached via the client // management agent API. // @@ -207,9 +161,9 @@ private: uint32_t agentBank; std::string routingKey; ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} - ManagementObject* GetManagementObject (void) const { return mgmtObject; } + qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } virtual ~RemoteAgent (); void mapEncode(qpid::types::Variant::Map& _map) const; @@ -276,7 +230,7 @@ private: PackageMap packages; // - // Protected by userLock + // Protected by objectLock // ManagementObjectMap managementObjects; @@ -288,11 +242,11 @@ private: framing::Uuid uuid; // - // Lock hierarchy: If a thread needs to take both addLock and userLock, - // it MUST take userLock first, then addLock. + // Lock ordering: userLock -> addLock -> objectLock // sys::Mutex userLock; sys::Mutex addLock; + sys::Mutex objectLock; qpid::broker::Exchange::shared_ptr mExchange; qpid::broker::Exchange::shared_ptr dExchange; @@ -335,53 +289,51 @@ private: // list of objects that have been deleted, but have yet to be published // one final time. // Indexed by a string composed of the object's package and class name. - // Protected by userLock. + // Protected by objectLock. typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; PendingDeletedObjsMap pendingDeletedObjs; -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - framing::ResizableBuffer msgBuffer; + // Pollable queue to serialize event messages + typedef std::pair<boost::shared_ptr<broker::Exchange>, + broker::Message> ExchangeAndMessage; + typedef sys::PollableQueue<ExchangeAndMessage> EventQueue; // // Memory statistics object // - qmf::org::apache::qpid::broker::Memory *memstat; + qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; void writeData (); void periodicProcessing (void); - void deleteObjectNowLH(const ObjectId& oid); + void deleteObjectNow(const ObjectId& oid); void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey); - void sendBufferLH(framing::Buffer& buf, - uint32_t length, - const std::string& exchange, - const std::string& routingKey); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - qpid::broker::Exchange::shared_ptr exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void sendBufferLH(const std::string& data, - const std::string& cid, - const qpid::types::Variant::Map& headers, - const std::string& content_type, - const std::string& exchange, - const std::string& routingKey, - uint64_t ttl_msec = 0); - void moveNewObjectsLH(); - bool moveDeletedObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); + EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch); + void sendBuffer(framing::Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBuffer(framing::Buffer& buf, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjects(); + bool moveDeletedObjects(); + + bool authorizeAgentMessage(qpid::broker::Message& msg); + void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false); PackageMap::iterator findOrAddPackageLH(std::string name); void addClassLH(uint8_t kind, @@ -399,22 +351,22 @@ private: uint32_t allocateNewBank (); uint32_t assignBankLH (uint32_t requestedPrefix); void deleteOrphanedAgentsLH(); - void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, - uint32_t code = 0, const std::string& text = "OK"); - void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); - void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); - void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); - void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + void sendCommandComplete(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); + void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); + void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); size_t validateSchema(framing::Buffer&, uint8_t kind); @@ -424,6 +376,7 @@ private: std::string summarizeAgents(); void debugSnapshot(const char* title); + std::auto_ptr<EventQueue> sendQueue; }; void setManagementExecutionContext(const qpid::broker::ConnectionState*); |