summaryrefslogtreecommitdiff
path: root/M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
diff options
context:
space:
mode:
Diffstat (limited to 'M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h')
-rw-r--r--M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h237
1 files changed, 0 insertions, 237 deletions
diff --git a/M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
deleted file mode 100644
index 53eb690ba8..0000000000
--- a/M4-RCs/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ /dev/null
@@ -1,237 +0,0 @@
-#ifndef _qpid_agent_ManagementAgentImpl_
-#define _qpid_agent_ManagementAgentImpl_
-
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-#include "ManagementAgent.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionSettings.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Session.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/framing/Uuid.h"
-#include <iostream>
-#include <sstream>
-#include <deque>
-
-namespace qpid {
-namespace management {
-
-class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
-{
- public:
-
- ManagementAgentImpl();
- virtual ~ManagementAgentImpl();
-
- //
- // Methods from ManagementAgent
- //
- int getMaxThreads() { return 1; }
- void init(const std::string& brokerHost = "localhost",
- uint16_t brokerPort = 5672,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- const std::string& storeFile = "",
- const std::string& uid = "guest",
- const std::string& pwd = "guest",
- const std::string& mech = "PLAIN",
- const std::string& proto = "tcp");
- void init(const client::ConnectionSettings& settings,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- const std::string& storeFile = "");
- bool isConnected() { return connected; }
- std::string& getLastFailure() { return lastFailure; }
- void registerClass(const std::string& packageName,
- const std::string& className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent(const std::string& packageName,
- const std::string& eventName,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
- void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
- uint32_t pollCallbacks(uint32_t callLimit = 0);
- int getSignalFd();
-
- uint16_t getInterval() { return interval; }
- void periodicProcessing();
-
- private:
-
- struct SchemaClassKey {
- std::string name;
- uint8_t hash[16];
- };
-
- struct SchemaClassKeyComp {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- struct SchemaClass {
- management::ManagementObject::writeSchemaCall_t writeSchemaCall;
- uint8_t kind;
-
- SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
- const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
- };
-
- struct QueuedMethod {
- QueuedMethod(uint32_t _seq, std::string _reply, std::string _body) :
- sequence(_seq), replyTo(_reply), body(_body) {}
-
- uint32_t sequence;
- std::string replyTo;
- std::string body;
- };
-
- typedef std::deque<QueuedMethod*> MethodQueue;
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
- typedef std::map<std::string, ClassMap> PackageMap;
-
- PackageMap packages;
- AgentAttachment attachment;
- management::ManagementObjectMap managementObjects;
- management::ManagementObjectMap newManagementObjects;
- MethodQueue methodQueue;
-
- void received (client::Message& msg);
-
- uint16_t interval;
- bool extThread;
- int writeFd;
- int readFd;
- uint64_t nextObjectId;
- std::string storeFile;
- sys::Mutex agentLock;
- sys::Mutex addLock;
- framing::Uuid systemId;
- client::ConnectionSettings connectionSettings;
- bool initialized;
- bool connected;
- std::string lastFailure;
-
- bool clientWasAdded;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- uint16_t bootSequence;
-
- static const uint8_t DEBUG_OFF = 0;
- static const uint8_t DEBUG_CONN = 1;
- static const uint8_t DEBUG_PROTO = 2;
- static const uint8_t DEBUG_PUBLISH = 3;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
-
- friend class ConnectionThread;
- class ConnectionThread : public sys::Runnable
- {
- bool operational;
- ManagementAgentImpl& agent;
- framing::Uuid sessionId;
- client::Connection connection;
- client::Session session;
- client::SubscriptionManager* subscriptions;
- std::stringstream queueName;
- mutable sys::Mutex connLock;
- bool shutdown;
- bool sleeping;
- void run();
- public:
- ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent), subscriptions(0),
- shutdown(false), sleeping(false) {}
- ~ConnectionThread();
- void sendBuffer(qpid::framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void bindToBank(uint32_t brokerBank, uint32_t agentBank);
- void close();
- bool isSleeping() const;
- };
-
- class PublishThread : public sys::Runnable
- {
- ManagementAgentImpl& agent;
- void run();
- public:
- PublishThread(ManagementAgentImpl& _agent) : agent(_agent) {}
- };
-
- ConnectionThread connThreadBody;
- sys::Thread connThread;
- PublishThread pubThreadBody;
- sys::Thread pubThread;
-
- static const std::string storeMagicNumber;
-
- void startProtocol();
- void storeData(bool requested=false);
- void retrieveData();
- PackageMap::iterator findOrAddPackage(const std::string& name);
- void moveNewObjectsLH();
- void addClassLocal (uint8_t classKind,
- PackageMap::iterator pIter,
- const std::string& className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- void encodePackageIndication (framing::Buffer& buf,
- PackageMap::iterator pIter);
- void encodeClassIndication (framing::Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter);
- void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendCommandComplete (std::string replyToKey, uint32_t sequence,
- uint32_t code = 0, std::string text = std::string("OK"));
- void handleAttachResponse (qpid::framing::Buffer& inBuffer);
- void handlePackageRequest (qpid::framing::Buffer& inBuffer);
- void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence);
- void invokeMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleGetQuery (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleMethodRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, std::string replyTo);
- void handleConsoleAddedIndication();
-};
-
-}}
-
-#endif /*!_qpid_agent_ManagementAgentImpl_*/