diff options
author | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
---|---|---|
committer | Robert Gemmell <robbie@apache.org> | 2015-06-25 10:22:51 +0000 |
commit | 32ae758bc2e8fd962b66a4ab6341b14009f1907e (patch) | |
tree | 2f4d8174813284a6ea58bb6b7f6520aa92287476 /qpid/cpp/src/qpid/management/ManagementAgent.h | |
parent | 116d91ad7825a98af36a869fc751206fbce0c59f (diff) | |
parent | f7e896076143de4572b4f1f67ef0765125f2498d (diff) | |
download | qpid-python-32ae758bc2e8fd962b66a4ab6341b14009f1907e.tar.gz |
NO-JIRA: create branch for qpid-cpp 0.34 RC process
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-cpp-0.34-rc@1687469 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qpid/management/ManagementAgent.h')
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 388 |
1 files changed, 388 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..81bf542766 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,388 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/management/ManagementEvent.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Agent.h" +#include "qmf/org/apache/qpid/broker/Memory.h" +#include "qpid/sys/MemStat.h" +#include "qpid/sys/PollableQueue.h" +#include "qpid/types/Variant.h" +#include <qpid/framing/AMQFrame.h> +#include <qpid/framing/ResizableBuffer.h> +#include <boost/shared_ptr.hpp> +#include <memory> +#include <string> +#include <map> + +namespace qpid { +namespace broker { +class Connection; +class ProtocolRegistry; +} +namespace sys { +class Timer; +} +namespace management { + +class ManagementAgent +{ +private: + + int threadPoolSize; + +public: + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + + ManagementAgent (const bool qmfV1, const bool qmfV2); + virtual ~ManagementAgent (); + + /** Called before plugins are initialized */ + void configure (const std::string& dataDir, bool publish, uint16_t interval, + qpid::broker::Broker* broker, int threadPoolSize); + + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); + void getName(std::string& vendor, std::string& product, std::string& instance); + const std::string& getAddress(); + + void setInterval(uint16_t _interval) { interval = _interval; } + void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, + qpid::broker::Exchange::shared_ptr directExchange); + + int getMaxThreads () { return threadPoolSize; } + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object, + const std::string& key, + bool persistent = false); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + + bool dispatchCommand (qpid::broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args, + const bool topic, + int qmfVersion); + + /** Disallow a method. Attempts to call it will receive an exception with message. */ + void disallow(const std::string& className, const std::string& methodName, const std::string& message); + + uint16_t getBootSequence(void) { return bootSequence; } + void setBootSequence(uint16_t b) { bootSequence = b; writeData(); } + + const framing::Uuid& getUuid() const { return uuid; } + void setUuid(const framing::Uuid& id) { uuid = id; writeData(); } + + static types::Variant::Map toMap(const framing::FieldTable& from); + + class DeletedObject { + public: + typedef boost::shared_ptr<DeletedObject> shared_ptr; + DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2); + ~DeletedObject() {}; + const std::string getKey() const { + // used to batch up objects of the same class type + return std::string(packageName + std::string(":") + className); + } + + private: + friend class ManagementAgent; + + std::string packageName; + std::string className; + std::string objectId; + + std::string encodedV1Config; // qmfv1 properties + std::string encodedV1Inst; // qmfv1 statistics + qpid::types::Variant::Map encodedV2; + }; + + typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; + +private: + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + ManagementAgent& agent; + uint32_t brokerBank; + uint32_t agentBank; + std::string routingKey; + ObjectId connectionRef; + qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent) {} + ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; } + + virtual ~RemoteAgent (); + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedBufSize() const; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + + struct SchemaClass + { + uint8_t kind; + ManagementObject::writeSchemaCall_t writeSchemaCall; + std::string data; + uint32_t pendingSequence; + + SchemaClass(uint8_t _kind=0, uint32_t seq=0) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } + void appendSchema (framing::Buffer& buf); + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + + // + // Protected by objectLock + // + ManagementObjectMap managementObjects; + + // + // Protected by addLock + // + ManagementObjectVector newManagementObjects; + + framing::Uuid uuid; + + // + // Lock ordering: userLock -> addLock -> objectLock + // + sys::Mutex userLock; + sys::Mutex addLock; + sys::Mutex objectLock; + + qpid::broker::Exchange::shared_ptr mExchange; + qpid::broker::Exchange::shared_ptr dExchange; + qpid::broker::Exchange::shared_ptr v2Topic; + qpid::broker::Exchange::shared_ptr v2Direct; + std::string dataDir; + bool publish; + uint16_t interval; + qpid::broker::Broker* broker; + qpid::sys::Timer* timer; + qpid::broker::ProtocolRegistry* protocols; + uint16_t bootSequence; + uint32_t nextObjectId; + uint32_t brokerBank; + uint32_t nextRemoteBank; + uint32_t nextRequestSequence; + bool clientWasAdded; + const qpid::sys::AbsTime startTime; + bool suppressed; + + typedef std::pair<std::string,std::string> MethodName; + typedef std::map<MethodName, std::string> DisallowedMethods; + DisallowedMethods disallowed; + bool disallowAllV1Methods; + + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + std::string vendorNameKey; // "." --> "_" + std::string productNameKey; // "." --> "_" + std::string instanceNameKey; // "." --> "_" + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxReplyObjs; + + // list of objects that have been deleted, but have yet to be published + // one final time. + // Indexed by a string composed of the object's package and class name. + // Protected by objectLock. + typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; + PendingDeletedObjsMap pendingDeletedObjs; + + // Pollable queue to serialize event messages + typedef std::pair<boost::shared_ptr<broker::Exchange>, + broker::Message> ExchangeAndMessage; + typedef sys::PollableQueue<ExchangeAndMessage> EventQueue; + + // + // Memory statistics object + // + qmf::org::apache::qpid::broker::Memory::shared_ptr memstat; + + void writeData (); + void periodicProcessing (void); + void deleteObjectNow(const ObjectId& oid); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch); + void sendBuffer(framing::Buffer& buf, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBuffer(framing::Buffer& buf, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjects(); + bool moveDeletedObjects(); + + bool authorizeAgentMessage(qpid::broker::Message& msg); + void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false); + + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void encodeClassIndication (framing::Buffer& buf, + const std::string packageName, + const struct SchemaClassKey key, + uint8_t kind); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void deleteOrphanedAgentsLH(); + void sendCommandComplete(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId); + void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId); + void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId); + void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal); + void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal); + void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + + + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); + + std::string summarizeAgents(); + void debugSnapshot(const char* title); + std::auto_ptr<EventQueue> sendQueue; +}; + +void setManagementExecutionContext(const broker::Connection&); +void resetManagementExecutionContext(); +const broker::Connection* getCurrentPublisher(); +}} + +#endif /*!_ManagementAgent_*/ |