diff options
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.h')
-rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 298 |
1 files changed, 298 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h new file mode 100644 index 0000000000..bf340777d1 --- /dev/null +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -0,0 +1,298 @@ +#ifndef _qpid_agent_ManagementAgentImpl_ +#define _qpid_agent_ManagementAgentImpl_ + +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/agent/ManagementAgent.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ConnectionSettings.h" +#include "qpid/client/SubscriptionManager.h" +#include "qpid/client/Session.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/client/Message.h" +#include "qpid/client/MessageListener.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/PipeHandle.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/Uuid.h" +#include <iostream> +#include <sstream> +#include <deque> + +namespace qpid { +namespace management { + +class ManagementAgentImpl : public ManagementAgent, public client::MessageListener +{ + public: + + ManagementAgentImpl(); + virtual ~ManagementAgentImpl(); + + // + // Methods from ManagementAgent + // + int getMaxThreads() { return 1; } + void setName(const std::string& vendor, + const std::string& product, + const std::string& instance=""); + void getName(std::string& vendor, std::string& product, std::string& instance); + const std::string& getAddress(); + void init(const std::string& brokerHost = "localhost", + uint16_t brokerPort = 5672, + uint16_t intervalSeconds = 10, + bool useExternalThread = false, + const std::string& storeFile = "", + const std::string& uid = "guest", + const std::string& pwd = "guest", + const std::string& mech = "PLAIN", + const std::string& proto = "tcp"); + void init(const management::ConnectionSettings& settings, + uint16_t intervalSeconds = 10, + bool useExternalThread = false, + const std::string& storeFile = ""); + bool isConnected() { return connected; } + std::string& getLastFailure() { return lastFailure; } + void registerClass(const std::string& packageName, + const std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + void registerEvent(const std::string& packageName, + const std::string& eventName, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0); + ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key, + bool persistent); + void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT); + uint32_t pollCallbacks(uint32_t callLimit = 0); + int getSignalFd(); + void setSignalCallback(cb_t callback, void* context); + void setSignalCallback(Notifyable& n); + + uint16_t getInterval() { return interval; } + void periodicProcessing(); + + // these next are here to support the hot-wiring of state between clustered brokers + uint64_t getNextObjectId(void) { return nextObjectId; } + void setNextObjectId(uint64_t o) { nextObjectId = o; } + + uint16_t getBootSequence(void) { return bootSequence; } + void setBootSequence(uint16_t b) { bootSequence = b; } + + private: + + struct SchemaClassKey { + std::string name; + uint8_t hash[16]; + }; + + struct SchemaClassKeyComp { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + struct SchemaClass { + management::ManagementObject::writeSchemaCall_t writeSchemaCall; + uint8_t kind; + + SchemaClass(const management::ManagementObject::writeSchemaCall_t call, + const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {} + }; + + struct QueuedMethod { + QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) : + cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {} + + std::string cid; + std::string replyToExchange; + std::string replyToKey; + std::string body; + std::string userId; + }; + + typedef std::deque<QueuedMethod*> MethodQueue; + typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; + typedef std::map<std::string, ClassMap> PackageMap; + + PackageMap packages; + AgentAttachment attachment; + + typedef std::map<ObjectId, boost::shared_ptr<ManagementObject> > ObjectMap; + + ObjectMap managementObjects; + ObjectMap newManagementObjects; + MethodQueue methodQueue; + + void received (client::Message& msg); + + qpid::types::Variant::Map attrMap; + std::string name_address; + std::string vendorNameKey; // vendor name with "." --> "_" + std::string productNameKey; // product name with "." --> "_" + std::string instanceNameKey; // agent instance with "." --> "_" + uint16_t interval; + bool extThread; + sys::PipeHandle* pipeHandle; + uint64_t nextObjectId; + cb_t notifyCallback; + void* notifyContext; + Notifyable* notifyable; + bool inCallback; + std::string storeFile; + sys::Mutex agentLock; + sys::Mutex addLock; + framing::Uuid systemId; + client::ConnectionSettings connectionSettings; + bool initialized; + bool connected; + bool useMapMsg; + std::string lastFailure; + std::string topicExchange; + std::string directExchange; + qpid::sys::Duration schemaTimestamp; + + bool publishAllData; + uint32_t requestedBrokerBank; + uint32_t requestedAgentBank; + uint32_t assignedBrokerBank; + uint32_t assignedAgentBank; + uint16_t bootSequence; + + // Maximum # of objects allowed in a single V2 response + // message. + uint32_t maxV2ReplyObjs; + + static const uint8_t DEBUG_OFF = 0; + static const uint8_t DEBUG_CONN = 1; + static const uint8_t DEBUG_PROTO = 2; + static const uint8_t DEBUG_PUBLISH = 3; + +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + char eventBuffer[MA_BUFFER_SIZE]; + + friend class ConnectionThread; + class ConnectionThread : public sys::Runnable + { + typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr; + + bool operational; + ManagementAgentImpl& agent; + framing::Uuid sessionId; + client::Connection connection; + client::Session session; + ConnectionThread::shared_ptr subscriptions; + std::stringstream queueName; + mutable sys::Mutex connLock; + bool shutdown; + bool sleeping; + void run(); + public: + ConnectionThread(ManagementAgentImpl& _agent) : + operational(false), agent(_agent), + shutdown(false), sleeping(false) {} + ~ConnectionThread(); + void sendBuffer(qpid::framing::Buffer& buf, + uint32_t length, + const std::string& exchange, + const std::string& routingKey); + void sendBuffer(const std::string& data, + const std::string& cid, + const qpid::types::Variant::Map headers, + const std::string& exchange, + const std::string& routingKey, + const std::string& contentType="amqp/map", + uint64_t ttl_msec=0); + void sendMessage(qpid::client::Message msg, + const std::string& exchange, + const std::string& routingKey); + void bindToBank(uint32_t brokerBank, uint32_t agentBank); + void close(); + bool isSleeping() const; + }; + + class PublishThread : public sys::Runnable + { + ManagementAgentImpl& agent; + void run(); + bool shutdown; + public: + PublishThread(ManagementAgentImpl& _agent) : + agent(_agent), shutdown(false) {} + void close() { shutdown = true; } + }; + + ConnectionThread connThreadBody; + sys::Thread connThread; + PublishThread pubThreadBody; + sys::Thread pubThread; + + static const std::string storeMagicNumber; + + void startProtocol(); + void storeData(bool requested=false); + void retrieveData(std::string& vendor, std::string& product, std::string& inst); + PackageMap::iterator findOrAddPackage(const std::string& name); + void moveNewObjectsLH(); + void addClassLocal (uint8_t classKind, + PackageMap::iterator pIter, + const std::string& className, + uint8_t* md5Sum, + management::ManagementObject::writeSchemaCall_t schemaCall); + void encodePackageIndication (framing::Buffer& buf, + PackageMap::iterator pIter); + void encodeClassIndication (framing::Buffer& buf, + PackageMap::iterator pIter, + ClassMap::iterator cIter); + void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); + qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, + const std::string& cname, + const uint8_t *md5Sum, + uint8_t type=ManagementItem::CLASS_KIND_TABLE); + bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); + void sendHeartbeat(); + void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid, + const std::string& text, uint32_t code=1); + void handlePackageRequest (qpid::framing::Buffer& inBuffer); + void handleClassQuery (qpid::framing::Buffer& inBuffer); + void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk); + void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId); + + void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk); + void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk); + void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId); + void handleConsoleAddedIndication(); + void getHeartbeatContent (qpid::types::Variant::Map& map); +}; + +}} + +#endif /*!_qpid_agent_ManagementAgentImpl_*/ |