summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/agent/ManagementAgentImpl.h')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h298
1 files changed, 298 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
new file mode 100644
index 0000000000..bf340777d1
--- /dev/null
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -0,0 +1,298 @@
+#ifndef _qpid_agent_ManagementAgentImpl_
+#define _qpid_agent_ManagementAgentImpl_
+
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/agent/ManagementAgent.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/ConnectionSettings.h"
+#include "qpid/client/SubscriptionManager.h"
+#include "qpid/client/Session.h"
+#include "qpid/client/AsyncSession.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/MessageListener.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/Time.h"
+#include "qpid/framing/Uuid.h"
+#include <iostream>
+#include <sstream>
+#include <deque>
+
+namespace qpid {
+namespace management {
+
+class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
+{
+ public:
+
+ ManagementAgentImpl();
+ virtual ~ManagementAgentImpl();
+
+ //
+ // Methods from ManagementAgent
+ //
+ int getMaxThreads() { return 1; }
+ void setName(const std::string& vendor,
+ const std::string& product,
+ const std::string& instance="");
+ void getName(std::string& vendor, std::string& product, std::string& instance);
+ const std::string& getAddress();
+ void init(const std::string& brokerHost = "localhost",
+ uint16_t brokerPort = 5672,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false,
+ const std::string& storeFile = "",
+ const std::string& uid = "guest",
+ const std::string& pwd = "guest",
+ const std::string& mech = "PLAIN",
+ const std::string& proto = "tcp");
+ void init(const management::ConnectionSettings& settings,
+ uint16_t intervalSeconds = 10,
+ bool useExternalThread = false,
+ const std::string& storeFile = "");
+ bool isConnected() { return connected; }
+ std::string& getLastFailure() { return lastFailure; }
+ void registerClass(const std::string& packageName,
+ const std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ void registerEvent(const std::string& packageName,
+ const std::string& eventName,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
+ ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key,
+ bool persistent);
+ void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
+ uint32_t pollCallbacks(uint32_t callLimit = 0);
+ int getSignalFd();
+ void setSignalCallback(cb_t callback, void* context);
+ void setSignalCallback(Notifyable& n);
+
+ uint16_t getInterval() { return interval; }
+ void periodicProcessing();
+
+ // these next are here to support the hot-wiring of state between clustered brokers
+ uint64_t getNextObjectId(void) { return nextObjectId; }
+ void setNextObjectId(uint64_t o) { nextObjectId = o; }
+
+ uint16_t getBootSequence(void) { return bootSequence; }
+ void setBootSequence(uint16_t b) { bootSequence = b; }
+
+ private:
+
+ struct SchemaClassKey {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass {
+ management::ManagementObject::writeSchemaCall_t writeSchemaCall;
+ uint8_t kind;
+
+ SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
+ const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
+ };
+
+ struct QueuedMethod {
+ QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
+ cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
+
+ std::string cid;
+ std::string replyToExchange;
+ std::string replyToKey;
+ std::string body;
+ std::string userId;
+ };
+
+ typedef std::deque<QueuedMethod*> MethodQueue;
+ typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ PackageMap packages;
+ AgentAttachment attachment;
+
+ typedef std::map<ObjectId, boost::shared_ptr<ManagementObject> > ObjectMap;
+
+ ObjectMap managementObjects;
+ ObjectMap newManagementObjects;
+ MethodQueue methodQueue;
+
+ void received (client::Message& msg);
+
+ qpid::types::Variant::Map attrMap;
+ std::string name_address;
+ std::string vendorNameKey; // vendor name with "." --> "_"
+ std::string productNameKey; // product name with "." --> "_"
+ std::string instanceNameKey; // agent instance with "." --> "_"
+ uint16_t interval;
+ bool extThread;
+ sys::PipeHandle* pipeHandle;
+ uint64_t nextObjectId;
+ cb_t notifyCallback;
+ void* notifyContext;
+ Notifyable* notifyable;
+ bool inCallback;
+ std::string storeFile;
+ sys::Mutex agentLock;
+ sys::Mutex addLock;
+ framing::Uuid systemId;
+ client::ConnectionSettings connectionSettings;
+ bool initialized;
+ bool connected;
+ bool useMapMsg;
+ std::string lastFailure;
+ std::string topicExchange;
+ std::string directExchange;
+ qpid::sys::Duration schemaTimestamp;
+
+ bool publishAllData;
+ uint32_t requestedBrokerBank;
+ uint32_t requestedAgentBank;
+ uint32_t assignedBrokerBank;
+ uint32_t assignedAgentBank;
+ uint16_t bootSequence;
+
+ // Maximum # of objects allowed in a single V2 response
+ // message.
+ uint32_t maxV2ReplyObjs;
+
+ static const uint8_t DEBUG_OFF = 0;
+ static const uint8_t DEBUG_CONN = 1;
+ static const uint8_t DEBUG_PROTO = 2;
+ static const uint8_t DEBUG_PUBLISH = 3;
+
+# define MA_BUFFER_SIZE 65536
+ char outputBuffer[MA_BUFFER_SIZE];
+ char eventBuffer[MA_BUFFER_SIZE];
+
+ friend class ConnectionThread;
+ class ConnectionThread : public sys::Runnable
+ {
+ typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr;
+
+ bool operational;
+ ManagementAgentImpl& agent;
+ framing::Uuid sessionId;
+ client::Connection connection;
+ client::Session session;
+ ConnectionThread::shared_ptr subscriptions;
+ std::stringstream queueName;
+ mutable sys::Mutex connLock;
+ bool shutdown;
+ bool sleeping;
+ void run();
+ public:
+ ConnectionThread(ManagementAgentImpl& _agent) :
+ operational(false), agent(_agent),
+ shutdown(false), sleeping(false) {}
+ ~ConnectionThread();
+ void sendBuffer(qpid::framing::Buffer& buf,
+ uint32_t length,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map headers,
+ const std::string& exchange,
+ const std::string& routingKey,
+ const std::string& contentType="amqp/map",
+ uint64_t ttl_msec=0);
+ void sendMessage(qpid::client::Message msg,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void bindToBank(uint32_t brokerBank, uint32_t agentBank);
+ void close();
+ bool isSleeping() const;
+ };
+
+ class PublishThread : public sys::Runnable
+ {
+ ManagementAgentImpl& agent;
+ void run();
+ bool shutdown;
+ public:
+ PublishThread(ManagementAgentImpl& _agent) :
+ agent(_agent), shutdown(false) {}
+ void close() { shutdown = true; }
+ };
+
+ ConnectionThread connThreadBody;
+ sys::Thread connThread;
+ PublishThread pubThreadBody;
+ sys::Thread pubThread;
+
+ static const std::string storeMagicNumber;
+
+ void startProtocol();
+ void storeData(bool requested=false);
+ void retrieveData(std::string& vendor, std::string& product, std::string& inst);
+ PackageMap::iterator findOrAddPackage(const std::string& name);
+ void moveNewObjectsLH();
+ void addClassLocal (uint8_t classKind,
+ PackageMap::iterator pIter,
+ const std::string& className,
+ uint8_t* md5Sum,
+ management::ManagementObject::writeSchemaCall_t schemaCall);
+ void encodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void encodeClassIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
+ const std::string& cname,
+ const uint8_t *md5Sum,
+ uint8_t type=ManagementItem::CLASS_KIND_TABLE);
+ bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void sendHeartbeat();
+ void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
+ const std::string& text, uint32_t code=1);
+ void handlePackageRequest (qpid::framing::Buffer& inBuffer);
+ void handleClassQuery (qpid::framing::Buffer& inBuffer);
+ void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
+ void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
+
+ void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
+ void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
+ void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
+ void handleConsoleAddedIndication();
+ void getHeartbeatContent (qpid::types::Variant::Map& map);
+};
+
+}}
+
+#endif /*!_qpid_agent_ManagementAgentImpl_*/