diff options
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/management/ManagementBroker.h')
-rw-r--r-- | M4-RCs/qpid/cpp/src/qpid/management/ManagementBroker.h | 235 |
1 files changed, 0 insertions, 235 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/management/ManagementBroker.h b/M4-RCs/qpid/cpp/src/qpid/management/ManagementBroker.h deleted file mode 100644 index 59dfb98596..0000000000 --- a/M4-RCs/qpid/cpp/src/qpid/management/ManagementBroker.h +++ /dev/null @@ -1,235 +0,0 @@ -#ifndef _ManagementBroker_ -#define _ManagementBroker_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/Options.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/Timer.h" -#include "qpid/framing/Uuid.h" -#include "qpid/sys/Mutex.h" -#include "qpid/broker/ConnectionToken.h" -#include "qpid/agent/ManagementAgent.h" -#include "ManagementObject.h" -#include "Manageable.h" -#include "qmf/org/apache/qpid/broker/Agent.h" -#include <qpid/framing/AMQFrame.h> - -namespace qpid { -namespace management { - -class ManagementBroker : public ManagementAgent -{ -private: - - int threadPoolSize; - -public: - - ManagementBroker (); - virtual ~ManagementBroker (); - - void configure (const std::string& dataDir, uint16_t interval, - qpid::broker::Broker* broker, int threadPoolSize); - void setInterval (uint16_t _interval) { interval = _interval; } - void setExchange (qpid::broker::Exchange::shared_ptr mgmtExchange, - qpid::broker::Exchange::shared_ptr directExchange); - int getMaxThreads () { return threadPoolSize; } - void registerClass (const std::string& packageName, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void registerEvent (const std::string& packageName, - const std::string& eventName, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - ObjectId addObject (ManagementObject* object, - uint64_t persistId = 0); - void raiseEvent(const ManagementEvent& event, severity_t severity = SEV_DEFAULT); - void clientAdded (const std::string& routingKey); - bool dispatchCommand (qpid::broker::Deliverable& msg, - const std::string& routingKey, - const framing::FieldTable* args); - const framing::Uuid& getUuid() const { return uuid; } - - // Stubs for remote management agent calls - void init(const std::string&, uint16_t, uint16_t, bool, - const std::string&, const std::string&, const std::string&, - const std::string&, const std::string&) { assert(0); } - void init(const client::ConnectionSettings&, uint16_t, bool, const std::string&) { assert(0); } - uint32_t pollCallbacks (uint32_t) { assert(0); return 0; } - int getSignalFd () { assert(0); return -1; } - -private: - friend class ManagementAgent; - - struct Periodic : public qpid::broker::TimerTask - { - ManagementBroker& broker; - - Periodic (ManagementBroker& broker, uint32_t seconds); - virtual ~Periodic (); - void fire (); - }; - - // Storage for tracking remote management agents, attached via the client - // management agent API. - // - struct RemoteAgent : public Manageable - { - uint32_t brokerBank; - uint32_t agentBank; - std::string routingKey; - ObjectId connectionRef; - qmf::org::apache::qpid::broker::Agent* mgmtObject; - ManagementObject* GetManagementObject (void) const { return mgmtObject; } - virtual ~RemoteAgent (); - }; - - // TODO: Eventually replace string with entire reply-to structure. reply-to - // currently assumes that the exchange is "amq.direct" even though it could - // in theory be specified differently. - typedef std::map<ObjectId, RemoteAgent*> RemoteAgentMap; - typedef std::vector<std::string> ReplyToVector; - - // Storage for known schema classes: - // - // SchemaClassKey -- Key elements for map lookups - // SchemaClassKeyComp -- Comparison class for SchemaClassKey - // SchemaClass -- Non-key elements for classes - // - struct SchemaClassKey - { - std::string name; - uint8_t hash[16]; - }; - - struct SchemaClassKeyComp - { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - struct SchemaClass - { - uint8_t kind; - ManagementObject::writeSchemaCall_t writeSchemaCall; - uint32_t pendingSequence; - size_t bufferLen; - uint8_t* buffer; - - SchemaClass(uint8_t _kind, uint32_t seq) : - kind(_kind), writeSchemaCall(0), pendingSequence(seq), bufferLen(0), buffer(0) {} - SchemaClass(uint8_t _kind, ManagementObject::writeSchemaCall_t call) : - kind(_kind), writeSchemaCall(call), pendingSequence(0), bufferLen(0), buffer(0) {} - bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); } - void appendSchema (framing::Buffer& buf); - }; - - typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap; - typedef std::map<std::string, ClassMap> PackageMap; - - RemoteAgentMap remoteAgents; - PackageMap packages; - ManagementObjectMap managementObjects; - ManagementObjectMap newManagementObjects; - - static ManagementAgent* agent; - static bool enabled; - - framing::Uuid uuid; - sys::Mutex addLock; - sys::Mutex userLock; - qpid::broker::Timer timer; - qpid::broker::Exchange::shared_ptr mExchange; - qpid::broker::Exchange::shared_ptr dExchange; - std::string dataDir; - uint16_t interval; - qpid::broker::Broker* broker; - uint16_t bootSequence; - uint32_t nextObjectId; - uint32_t brokerBank; - uint32_t nextRemoteBank; - uint32_t nextRequestSequence; - bool clientWasAdded; - -# define MA_BUFFER_SIZE 65536 - char inputBuffer[MA_BUFFER_SIZE]; - char outputBuffer[MA_BUFFER_SIZE]; - char eventBuffer[MA_BUFFER_SIZE]; - - void writeData (); - void periodicProcessing (void); - void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); - bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); - void sendBuffer (framing::Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); - void moveNewObjectsLH(); - - bool authorizeAgentMessageLH(qpid::broker::Message& msg); - void dispatchAgentCommandLH(qpid::broker::Message& msg); - - PackageMap::iterator findOrAddPackageLH(std::string name); - void addClassLH(uint8_t kind, - PackageMap::iterator pIter, - const std::string& className, - uint8_t* md5Sum, - ManagementObject::writeSchemaCall_t schemaCall); - void encodePackageIndication (framing::Buffer& buf, - PackageMap::iterator pIter); - void encodeClassIndication (framing::Buffer& buf, - PackageMap::iterator pIter, - ClassMap::iterator cIter); - bool bankInUse (uint32_t bank); - uint32_t allocateNewBank (); - uint32_t assignBankLH (uint32_t requestedPrefix); - void deleteOrphanedAgentsLH(); - void sendCommandComplete (std::string replyToKey, uint32_t sequence, - uint32_t code = 0, std::string text = std::string("OK")); - void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence); - void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken); - - size_t validateSchema(framing::Buffer&, uint8_t kind); - size_t validateTableSchema(framing::Buffer&); - size_t validateEventSchema(framing::Buffer&); -}; - -}} - -#endif /*!_ManagementBroker_*/ |