summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/management/ManagementAgent.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/management/ManagementAgent.h')
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h388
1 files changed, 388 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
new file mode 100644
index 0000000000..81bf542766
--- /dev/null
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -0,0 +1,388 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/BrokerImportExport.h"
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/management/ManagementObject.h"
+#include "qpid/management/ManagementEvent.h"
+#include "qpid/management/Manageable.h"
+#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+#include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
+#include "qpid/types/Variant.h"
+#include <qpid/framing/AMQFrame.h>
+#include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
+#include <memory>
+#include <string>
+#include <map>
+
+namespace qpid {
+namespace broker {
+class Connection;
+class ProtocolRegistry;
+}
+namespace sys {
+class Timer;
+}
+namespace management {
+
+class ManagementAgent
+{
+private:
+
+ int threadPoolSize;
+
+public:
+ typedef enum {
+ SEV_EMERG = 0,
+ SEV_ALERT = 1,
+ SEV_CRIT = 2,
+ SEV_ERROR = 3,
+ SEV_WARN = 4,
+ SEV_NOTE = 5,
+ SEV_INFO = 6,
+ SEV_DEBUG = 7,
+ SEV_DEFAULT = 8
+ } severity_t;
+
+
+ ManagementAgent (const bool qmfV1, const bool qmfV2);
+ virtual ~ManagementAgent ();
+
+ /** Called before plugins are initialized */
+ void configure (const std::string& dataDir, bool publish, uint16_t interval,
+ qpid::broker::Broker* broker, int threadPoolSize);
+
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
+ void getName(std::string& vendor, std::string& product, std::string& instance);
+ const std::string& getAddress();
+
+ void setInterval(uint16_t _interval) { interval = _interval; }
+ void setExchange(qpid::broker::Exchange::shared_ptr mgmtExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+ void setExchangeV2(qpid::broker::Exchange::shared_ptr topicExchange,
+ qpid::broker::Exchange::shared_ptr directExchange);
+
+ int getMaxThreads () { return threadPoolSize; }
+ QPID_BROKER_EXTERN void registerClass (const std::string& packageName,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN void registerEvent (const std::string& packageName,
+ const std::string& eventName,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ uint64_t persistId = 0,
+ bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ const std::string& key,
+ bool persistent = false);
+ QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
+ severity_t severity = SEV_DEFAULT);
+ QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
+
+ bool dispatchCommand (qpid::broker::Deliverable& msg,
+ const std::string& routingKey,
+ const framing::FieldTable* args,
+ const bool topic,
+ int qmfVersion);
+
+ /** Disallow a method. Attempts to call it will receive an exception with message. */
+ void disallow(const std::string& className, const std::string& methodName, const std::string& message);
+
+ uint16_t getBootSequence(void) { return bootSequence; }
+ void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
+
+ const framing::Uuid& getUuid() const { return uuid; }
+ void setUuid(const framing::Uuid& id) { uuid = id; writeData(); }
+
+ static types::Variant::Map toMap(const framing::FieldTable& from);
+
+ class DeletedObject {
+ public:
+ typedef boost::shared_ptr<DeletedObject> shared_ptr;
+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
+ ~DeletedObject() {};
+ const std::string getKey() const {
+ // used to batch up objects of the same class type
+ return std::string(packageName + std::string(":") + className);
+ }
+
+ private:
+ friend class ManagementAgent;
+
+ std::string packageName;
+ std::string className;
+ std::string objectId;
+
+ std::string encodedV1Config; // qmfv1 properties
+ std::string encodedV1Inst; // qmfv1 statistics
+ qpid::types::Variant::Map encodedV2;
+ };
+
+ typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
+
+private:
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent : public Manageable
+ {
+ ManagementAgent& agent;
+ uint32_t brokerBank;
+ uint32_t agentBank;
+ std::string routingKey;
+ ObjectId connectionRef;
+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
+
+ virtual ~RemoteAgent ();
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ };
+
+ typedef std::map<ObjectId, boost::shared_ptr<RemoteAgent> > RemoteAgentMap;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ void encode(framing::Buffer& buffer) const;
+ void decode(framing::Buffer& buffer);
+ uint32_t encodedBufSize() const;
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+
+ struct SchemaClass
+ {
+ uint8_t kind;
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ std::string data;
+ uint32_t pendingSequence;
+
+ SchemaClass(uint8_t _kind=0, uint32_t seq=0) :
+ kind(_kind), writeSchemaCall(0), pendingSequence(seq) {}
+ SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) :
+ kind(_kind), writeSchemaCall(call), pendingSequence(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || !data.empty(); }
+ void appendSchema (framing::Buffer& buf);
+
+ void mapEncode(qpid::types::Variant::Map& _map) const;
+ void mapDecode(const qpid::types::Variant::Map& _map);
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+
+ //
+ // Protected by objectLock
+ //
+ ManagementObjectMap managementObjects;
+
+ //
+ // Protected by addLock
+ //
+ ManagementObjectVector newManagementObjects;
+
+ framing::Uuid uuid;
+
+ //
+ // Lock ordering: userLock -> addLock -> objectLock
+ //
+ sys::Mutex userLock;
+ sys::Mutex addLock;
+ sys::Mutex objectLock;
+
+ qpid::broker::Exchange::shared_ptr mExchange;
+ qpid::broker::Exchange::shared_ptr dExchange;
+ qpid::broker::Exchange::shared_ptr v2Topic;
+ qpid::broker::Exchange::shared_ptr v2Direct;
+ std::string dataDir;
+ bool publish;
+ uint16_t interval;
+ qpid::broker::Broker* broker;
+ qpid::sys::Timer* timer;
+ qpid::broker::ProtocolRegistry* protocols;
+ uint16_t bootSequence;
+ uint32_t nextObjectId;
+ uint32_t brokerBank;
+ uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
+ bool clientWasAdded;
+ const qpid::sys::AbsTime startTime;
+ bool suppressed;
+
+ typedef std::pair<std::string,std::string> MethodName;
+ typedef std::map<MethodName, std::string> DisallowedMethods;
+ DisallowedMethods disallowed;
+ bool disallowAllV1Methods;
+
+ // Agent name and address
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
+ std::string vendorNameKey; // "." --> "_"
+ std::string productNameKey; // "." --> "_"
+ std::string instanceNameKey; // "." --> "_"
+
+ // supported management protocol
+ bool qmf1Support;
+ bool qmf2Support;
+
+ // Maximum # of objects allowed in a single V2 response
+ // message.
+ uint32_t maxReplyObjs;
+
+ // list of objects that have been deleted, but have yet to be published
+ // one final time.
+ // Indexed by a string composed of the object's package and class name.
+ // Protected by objectLock.
+ typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
+ PendingDeletedObjsMap pendingDeletedObjs;
+
+ // Pollable queue to serialize event messages
+ typedef std::pair<boost::shared_ptr<broker::Exchange>,
+ broker::Message> ExchangeAndMessage;
+ typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
+
+ //
+ // Memory statistics object
+ //
+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
+
+ void writeData ();
+ void periodicProcessing (void);
+ void deleteObjectNow(const ObjectId& oid);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
+ void sendBuffer(framing::Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
+ void sendBuffer(framing::Buffer& buf,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void moveNewObjects();
+ bool moveDeletedObjects();
+
+ bool authorizeAgentMessage(qpid::broker::Message& msg);
+ void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
+
+ PackageMap::iterator findOrAddPackageLH(std::string name);
+ void addClassLH(uint8_t kind,
+ PackageMap::iterator pIter,
+ const std::string& className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void encodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void encodeClassIndication (framing::Buffer& buf,
+ const std::string packageName,
+ const struct SchemaClassKey key,
+ uint8_t kind);
+ bool bankInUse (uint32_t bank);
+ uint32_t allocateNewBank ();
+ uint32_t assignBankLH (uint32_t requestedPrefix);
+ void deleteOrphanedAgentsLH();
+ void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+ uint32_t code = 0, const std::string& text = "OK");
+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const ObjectId& objectId);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
+ void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const std::string& userId);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const std::string& userId, bool viaLocal);
+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+
+
+ size_t validateSchema(framing::Buffer&, uint8_t kind);
+ size_t validateTableSchema(framing::Buffer&);
+ size_t validateEventSchema(framing::Buffer&);
+ ManagementObjectMap::iterator numericFind(const ObjectId& oid);
+
+ std::string summarizeAgents();
+ void debugSnapshot(const char* title);
+ std::auto_ptr<EventQueue> sendQueue;
+};
+
+void setManagementExecutionContext(const broker::Connection&);
+void resetManagementExecutionContext();
+const broker::Connection* getCurrentPublisher();
+}}
+
+#endif /*!_ManagementAgent_*/