diff options
Diffstat (limited to 'qpid/cpp/src/qpid/management/ManagementAgent.h')
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 432 |
1 files changed, 432 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h new file mode 100644 index 0000000000..fb15dc6ed1 --- /dev/null +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -0,0 +1,432 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/framing/Uuid.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Timer.h" +#include "qpid/broker/ConnectionToken.h" +#include "qpid/management/ManagementObject.h" +#include "qpid/management/ManagementEvent.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Agent.h" +#include "qpid/types/Variant.h" +#include <qpid/framing/AMQFrame.h> +#include <qpid/framing/FieldValue.h> +#include <qpid/framing/ResizableBuffer.h> +#include <memory> +#include <string> +#include <map> + +namespace qpid { +namespace broker { +class ConnectionState; +} +namespace management { + +class ManagementAgent +{ +private: + + int threadPoolSize; + +public: + typedef enum { + SEV_EMERG = 0, + SEV_ALERT = 1, + SEV_CRIT = 2, + SEV_ERROR = 3, + SEV_WARN = 4, + SEV_NOTE = 5, + SEV_INFO = 6, + SEV_DEBUG = 7, + SEV_DEFAULT = 8 + } severity_t; + + + ManagementAgent (const bool qmfV1, const bool qmfV2); + virtual ~ManagementAgent (); + + /** Called before plugins are initialized */ + void configure (const std::string& dataDir, uint16_t interval, + qpid::broker::Broker* broker, int threadPoolSize); + /** Called after plugins are initialized. */ + void pluginsInitialized(); + + /** Called by cluster to suppress management output during update. */ + void suppress(bool s) { suppressed = s; } + + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); + void getName(std::string& vendor, std::string& product, std::string& instance); + const std::string& getAddress(); + + void setInterval(uint16_t _interval) { interval = _interval; } + void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange, + qpid::broker::Exchange::shared_ptr directExchange); + void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange, + qpid::broker::Exchange::shared_ptr directExchange); + + int getMaxThreads () { return threadPoolSize; } + QPID_BROKER_EXTERN void registerClass (const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN void registerEvent (const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + uint64_t persistId = 0, + bool persistent = false); + QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object, + const std::string& key, + bool persistent = false); + QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event, + severity_t severity = SEV_DEFAULT); + QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey); + + QPID_BROKER_EXTERN void clusterUpdate(); + + bool dispatchCommand (qpid::broker::Deliverable& msg, + const std::string& routingKey, + const framing::FieldTable* args, + const bool topic, + int qmfVersion); + + /** Disallow a method. Attempts to call it will receive an exception with message. */ + void disallow(const std::string& className, const std::string& methodName, const std::string& message); + + /** Disallow all QMFv1 methods (used in clustered brokers). */ + void disallowV1Methods() { disallowAllV1Methods = true; } + + /** Serialize my schemas as a binary blob into schemaOut */ + void exportSchemas(std::string& schemaOut); + + /** Serialize my remote-agent map as a binary blob into agentsOut */ + void exportAgents(std::string& agentsOut); + + /** Decode a serialized schemas and add to my schema cache */ + void importSchemas(framing::Buffer& inBuf); + + /** Decode a serialized agent map */ + void importAgents(framing::Buffer& inBuf); + + // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers + uint64_t getNextObjectId(void) { return nextObjectId; } + void setNextObjectId(uint64_t o) { nextObjectId = o; } + + uint16_t getBootSequence(void) { return bootSequence; } + void setBootSequence(uint16_t b) { bootSequence = b; writeData(); } + + const framing::Uuid& getUuid() const { return uuid; } + void setUuid(const framing::Uuid& id) { uuid = id; writeData(); } + + // TODO: remove these when Variant API moved into common library. + static types::Variant::Map toMap(const framing::FieldTable& from); + static framing::FieldTable fromMap(const types::Variant::Map& from); + static types::Variant::List toList(const framing::List& from); + static framing::List fromList(const types::Variant::List& from); + static boost::shared_ptr<framing::FieldValue> toFieldValue(const types::Variant& in); + static types::Variant toVariant(const boost::shared_ptr<framing::FieldValue>& val); + + // For Clustering: management objects that have been marked as + // "deleted", but are waiting for their last published object + // update are not visible to the cluster replication code. These + // interfaces allow clustering to gather up all the management + // objects that are deleted in order to allow all clustered + // brokers to publish the same set of deleted objects. + + class DeletedObject { + public: + typedef boost::shared_ptr<DeletedObject> shared_ptr; + DeletedObject(ManagementObject *, bool v1, bool v2); + DeletedObject( const std::string &encoded ); + ~DeletedObject() {}; + void encode( std::string& toBuffer ); + const std::string getKey() const { + // used to batch up objects of the same class type + return std::string(packageName + std::string(":") + className); + } + + private: + friend class ManagementAgent; + + std::string packageName; + std::string className; + std::string objectId; + + std::string encodedV1Config; // qmfv1 properties + std::string encodedV1Inst; // qmfv1 statistics + qpid::types::Variant::Map encodedV2; + }; + + typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList; + + /** returns a snapshot of all currently deleted management objects. */ + void exportDeletedObjects( DeletedObjectList& outList ); + + /** Import a list of deleted objects to send on next publish interval. */ + void importDeletedObjects( const DeletedObjectList& inList ); + +private: + struct Periodic : public qpid::sys::TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + virtual ~Periodic (); + void fire (); + }; + + // Storage for tracking remote management agents, attached via the client + // management agent API. + // + struct RemoteAgent : public Manageable + { + ManagementAgent& agent; + uint32_t brokerBank; + uint32_t agentBank; + std::string routingKey; + ObjectId connectionRef; + qmf::org::apache::qpid::broker::Agent* mgmtObject; + RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {} + ManagementObject* GetManagementObject (void) const { return mgmtObject; } + + virtual ~RemoteAgent (); + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap; + + // Storage for known schema classes: + // + // SchemaClassKey -- Key elements for map lookups + // SchemaClassKeyComp -- Comparison class for SchemaClassKey + // SchemaClass -- Non-key elements for classes + // + struct SchemaClassKey + { + std::string name; + uint8_t hash[16]; + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedBufSize() const; + }; + + struct SchemaClassKeyComp + { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + + struct SchemaClass + { + uint8_t kind; + ManagementObject::writeSchemaCall_t writeSchemaCall; + std::string data; + uint32_t pendingSequence; + + SchemaClass(uint8_t _kind=0, uint32_t seq=0) : + kind(_kind), writeSchemaCall(0), pendingSequence(seq) {} + SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : + kind(_kind), writeSchemaCall(call), pendingSequence(0) {} + bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); } + void appendSchema (framing::Buffer& buf); + + void mapEncode(qpid::types::Variant::Map& _map) const; + void mapDecode(const qpid::types::Variant::Map& _map); + }; + + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + RemoteAgentMap remoteAgents; + PackageMap packages; + + // + // Protected by userLock + // + ManagementObjectMap managementObjects; + + // + // Protected by addLock + // + ManagementObjectVector newManagementObjects; + + framing::Uuid uuid; + + // + // Lock hierarchy: If a thread needs to take both addLock and userLock, + // it MUST take userLock first, then addLock. + // + sys::Mutex userLock; + sys::Mutex addLock; + + qpid::broker::Exchange::shared_ptr mExchange; + qpid::broker::Exchange::shared_ptr dExchange; + qpid::broker::Exchange::shared_ptr v2Topic; + qpid::broker::Exchange::shared_ptr v2Direct; + std::string dataDir; + uint16_t interval; + qpid::broker::Broker* broker; + qpid::sys::Timer* timer; + uint16_t bootSequence; + uint32_t nextObjectId; + uint32_t brokerBank; + uint32_t nextRemoteBank; + uint32_t nextRequestSequence; + bool clientWasAdded; + const qpid::sys::AbsTime startTime; + bool suppressed; + + typedef std::pair<std::string,std::string> MethodName; + typedef std::map<MethodName, std::string> DisallowedMethods; + DisallowedMethods disallowed; + bool disallowAllV1Methods; + + // Agent name and address + qpid::types::Variant::Map attrMap; + std::string name_address; + std::string vendorNameKey; // "." --> "_" + std::string productNameKey; // "." --> "_" + std::string instanceNameKey; // "." --> "_" + + // supported management protocol + bool qmf1Support; + bool qmf2Support; + + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxReplyObjs; + + // list of objects that have been deleted, but have yet to be published + // one final time. + // Indexed by a string composed of the object's package and class name. + // Protected by userLock. + typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap; + PendingDeletedObjsMap pendingDeletedObjs; + +# define MA_BUFFER_SIZE 65536 + char inputBuffer[MA_BUFFER_SIZE]; + char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; + framing::ResizableBuffer msgBuffer; + + void writeData (); + void periodicProcessing (void); + void deleteObjectNowLH(const ObjectId& oid); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendBufferLH(framing::Buffer& buf, + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey); + void sendBufferLH(framing::Buffer& buf, + uint32_t length, + const std::string& exchange, + const std::string& routingKey); + void sendBufferLH(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + qpid::broker::Exchange::shared_ptr exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void sendBufferLH(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map& headers, + const std::string& content_type, + const std::string& exchange, + const std::string& routingKey, + uint64_t ttl_msec = 0); + void moveNewObjectsLH(); + bool moveDeletedObjectsLH(); + + bool authorizeAgentMessageLH(qpid::broker::Message& msg); + void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false); + + PackageMap::iterator findOrAddPackageLH(std::string name); + void addClassLH(uint8_t kind, + PackageMap::iterator pIter, + const std::string& className, + uint8_t* md5Sum, + ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void encodeClassIndication (framing::Buffer& buf, + const std::string packageName, + const struct SchemaClassKey key, + uint8_t kind); + bool bankInUse (uint32_t bank); + uint32_t allocateNewBank (); + uint32_t assignBankLH (uint32_t requestedPrefix); + void deleteOrphanedAgentsLH(); + void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence, + uint32_t code = 0, const std::string& text = "OK"); + void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false); + void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence); + void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence); + void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); + void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal); + void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal); + void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid); + + + size_t validateSchema(framing::Buffer&, uint8_t kind); + size_t validateTableSchema(framing::Buffer&); + size_t validateEventSchema(framing::Buffer&); + ManagementObjectMap::iterator numericFind(const ObjectId& oid); + + std::string summarizeAgents(); + void debugSnapshot(const char* title); +}; + +void setManagementExecutionContext(const qpid::broker::ConnectionState*); +const qpid::broker::ConnectionState* getManagementExecutionContext(); +}} + +#endif /*!_ManagementAgent_*/ |