summaryrefslogtreecommitdiff
path: root/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
diff options
context:
space:
mode:
Diffstat (limited to 'trunk/qpid/cpp/src/qpid/management/ManagementBroker.h')
-rw-r--r--trunk/qpid/cpp/src/qpid/management/ManagementBroker.h216
1 files changed, 216 insertions, 0 deletions
diff --git a/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h b/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
new file mode 100644
index 0000000000..151926f526
--- /dev/null
+++ b/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
@@ -0,0 +1,216 @@
+#ifndef _ManagementBroker_
+#define _ManagementBroker_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/Options.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Timer.h"
+#include "qpid/framing/Uuid.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/broker/ConnectionToken.h"
+#include "qpid/agent/ManagementAgent.h"
+#include "ManagementObject.h"
+#include "Manageable.h"
+#include "qpid/management/Agent.h"
+#include <qpid/framing/AMQFrame.h>
+
+namespace qpid {
+namespace management {
+
+class ManagementBroker : public ManagementAgent
+{
+ private:
+
+ int threadPoolSize;
+
+ public:
+
+ ManagementBroker ();
+ virtual ~ManagementBroker ();
+
+ void configure (std::string dataDir, uint16_t interval, Manageable* broker, int threadPoolSize);
+ void setInterval (uint16_t _interval) { interval = _interval; }
+ void setExchange (broker::Exchange::shared_ptr mgmtExchange,
+ broker::Exchange::shared_ptr directExchange);
+ int getMaxThreads () { return threadPoolSize; }
+ void RegisterClass (std::string packageName,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ uint64_t addObject (ManagementObject* object,
+ uint32_t persistId = 0,
+ uint32_t persistBank = 4);
+ void clientAdded (void);
+ bool dispatchCommand (broker::Deliverable& msg,
+ const std::string& routingKey,
+ const framing::FieldTable* args);
+
+ // Stubs for remote management agent calls
+ void init (std::string, uint16_t, uint16_t, bool) { assert(0); }
+ uint32_t pollCallbacks (uint32_t) { assert(0); return 0; }
+ int getSignalFd () { assert(0); return -1; }
+
+ private:
+ friend class ManagementAgent;
+
+ struct Periodic : public broker::TimerTask
+ {
+ ManagementBroker& broker;
+
+ Periodic (ManagementBroker& broker, uint32_t seconds);
+ virtual ~Periodic ();
+ void fire ();
+ };
+
+ // Storage for tracking remote management agents, attached via the client
+ // management agent API.
+ //
+ struct RemoteAgent : public Manageable
+ {
+ uint32_t objIdBank;
+ std::string routingKey;
+ uint64_t connectionRef;
+ Agent* mgmtObject;
+ ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+ virtual ~RemoteAgent ();
+ };
+
+ // TODO: Eventually replace string with entire reply-to structure. reply-to
+ // currently assumes that the exchange is "amq.direct" even though it could
+ // in theory be specified differently.
+ typedef std::map<uint64_t, RemoteAgent*> RemoteAgentMap;
+ typedef std::vector<std::string> ReplyToVector;
+
+ // Storage for known schema classes:
+ //
+ // SchemaClassKey -- Key elements for map lookups
+ // SchemaClassKeyComp -- Comparison class for SchemaClassKey
+ // SchemaClass -- Non-key elements for classes
+ //
+ struct SchemaClassKey
+ {
+ std::string name;
+ uint8_t hash[16];
+ };
+
+ struct SchemaClassKeyComp
+ {
+ bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
+ {
+ if (lhs.name != rhs.name)
+ return lhs.name < rhs.name;
+ else
+ for (int i = 0; i < 16; i++)
+ if (lhs.hash[i] != rhs.hash[i])
+ return lhs.hash[i] < rhs.hash[i];
+ return false;
+ }
+ };
+
+ struct SchemaClass
+ {
+ ManagementObject::writeSchemaCall_t writeSchemaCall;
+ uint32_t pendingSequence;
+ size_t bufferLen;
+ uint8_t* buffer;
+
+ SchemaClass () : writeSchemaCall(0), pendingSequence(0), bufferLen(0), buffer(0) {}
+ bool hasSchema () { return (writeSchemaCall != 0) || (buffer != 0); }
+ void appendSchema (framing::Buffer& buf);
+ };
+
+ typedef std::map<SchemaClassKey, SchemaClass*, SchemaClassKeyComp> ClassMap;
+ typedef std::map<std::string, ClassMap> PackageMap;
+
+ RemoteAgentMap remoteAgents;
+ PackageMap packages;
+ ManagementObjectMap managementObjects;
+ ManagementObjectMap newManagementObjects;
+
+ static ManagementAgent* agent;
+ static bool enabled;
+
+ framing::Uuid uuid;
+ sys::Mutex addLock;
+ sys::Mutex userLock;
+ broker::Timer timer;
+ broker::Exchange::shared_ptr mExchange;
+ broker::Exchange::shared_ptr dExchange;
+ std::string dataDir;
+ uint16_t interval;
+ Manageable* broker;
+ uint16_t bootSequence;
+ uint32_t localBank;
+ uint32_t nextObjectId;
+ uint32_t nextRemoteBank;
+ uint32_t nextRequestSequence;
+ bool clientWasAdded;
+
+# define MA_BUFFER_SIZE 65536
+ char inputBuffer[MA_BUFFER_SIZE];
+ char outputBuffer[MA_BUFFER_SIZE];
+
+ void writeData ();
+ void PeriodicProcessing (void);
+ void EncodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
+ bool CheckHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
+ void SendBuffer (framing::Buffer& buf,
+ uint32_t length,
+ broker::Exchange::shared_ptr exchange,
+ std::string routingKey);
+ void moveNewObjectsLH();
+
+ void dispatchAgentCommandLH (broker::Message& msg);
+
+ PackageMap::iterator FindOrAddPackageLH(std::string name);
+ void AddClass(PackageMap::iterator pIter,
+ std::string className,
+ uint8_t* md5Sum,
+ ManagementObject::writeSchemaCall_t schemaCall);
+ void EncodePackageIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter);
+ void EncodeClassIndication (framing::Buffer& buf,
+ PackageMap::iterator pIter,
+ ClassMap::iterator cIter);
+ bool bankInUse (uint32_t bank);
+ uint32_t allocateNewBank ();
+ uint32_t assignBankLH (uint32_t requestedPrefix);
+ void deleteOrphanedAgentsLH();
+ void sendCommandComplete (std::string replyToKey, uint32_t sequence,
+ uint32_t code = 0, std::string text = std::string("OK"));
+ void handleBrokerRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handlePackageIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleClassIndLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleSchemaResponseLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleAttachRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence, const broker::ConnectionToken* connToken);
+ void handleGetQueryLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+ void handleMethodRequestLH (framing::Buffer& inBuffer, std::string replyToKey, uint32_t sequence);
+
+ size_t ValidateSchema(framing::Buffer&);
+};
+
+}}
+
+#endif /*!_ManagementBroker_*/