From d4d4a9f2ba6b9c457eba9c5ae0b5939d72bd2743 Mon Sep 17 00:00:00 2001 From: "Carl C. Trieloff" Date: Wed, 31 Oct 2007 20:06:05 +0000 Subject: Patch from Ted QPID-668 This patch does two things: 1) Adds management objects for "broker" and "virtual host". 2) Moves all management-related source files from qpid/broker to qpid/broker/management. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@590806 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/broker/Broker.cpp | 10 +- cpp/src/qpid/broker/Broker.h | 14 +- cpp/src/qpid/broker/ExchangeRegistry.cpp | 2 +- cpp/src/qpid/broker/ManagementAgent.cpp | 205 --------------------- cpp/src/qpid/broker/ManagementAgent.h | 73 -------- cpp/src/qpid/broker/ManagementExchange.cpp | 76 -------- cpp/src/qpid/broker/ManagementExchange.h | 61 ------ cpp/src/qpid/broker/ManagementObject.cpp | 54 ------ cpp/src/qpid/broker/ManagementObject.h | 117 ------------ cpp/src/qpid/broker/ManagementObjectQueue.cpp | 182 ------------------ cpp/src/qpid/broker/ManagementObjectQueue.h | 180 ------------------ cpp/src/qpid/broker/Queue.h | 2 +- cpp/src/qpid/broker/QueueRegistry.cpp | 4 +- cpp/src/qpid/broker/QueueRegistry.h | 2 +- cpp/src/qpid/broker/management/ManagementAgent.cpp | 204 ++++++++++++++++++++ cpp/src/qpid/broker/management/ManagementAgent.h | 73 ++++++++ .../qpid/broker/management/ManagementExchange.cpp | 76 ++++++++ .../qpid/broker/management/ManagementExchange.h | 61 ++++++ .../qpid/broker/management/ManagementObject.cpp | 54 ++++++ cpp/src/qpid/broker/management/ManagementObject.h | 117 ++++++++++++ .../broker/management/ManagementObjectBroker.cpp | 98 ++++++++++ .../broker/management/ManagementObjectBroker.h | 75 ++++++++ .../broker/management/ManagementObjectQueue.cpp | 186 +++++++++++++++++++ .../qpid/broker/management/ManagementObjectQueue.h | 181 ++++++++++++++++++ .../broker/management/ManagementObjectVhost.cpp | 54 ++++++ .../qpid/broker/management/ManagementObjectVhost.h | 63 +++++++ 26 files changed, 1265 insertions(+), 959 deletions(-) delete mode 100644 cpp/src/qpid/broker/ManagementAgent.cpp delete mode 100644 cpp/src/qpid/broker/ManagementAgent.h delete mode 100644 cpp/src/qpid/broker/ManagementExchange.cpp delete mode 100644 cpp/src/qpid/broker/ManagementExchange.h delete mode 100644 cpp/src/qpid/broker/ManagementObject.cpp delete mode 100644 cpp/src/qpid/broker/ManagementObject.h delete mode 100644 cpp/src/qpid/broker/ManagementObjectQueue.cpp delete mode 100644 cpp/src/qpid/broker/ManagementObjectQueue.h create mode 100644 cpp/src/qpid/broker/management/ManagementAgent.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementAgent.h create mode 100644 cpp/src/qpid/broker/management/ManagementExchange.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementExchange.h create mode 100644 cpp/src/qpid/broker/management/ManagementObject.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementObject.h create mode 100644 cpp/src/qpid/broker/management/ManagementObjectBroker.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementObjectBroker.h create mode 100644 cpp/src/qpid/broker/management/ManagementObjectQueue.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementObjectQueue.h create mode 100644 cpp/src/qpid/broker/management/ManagementObjectVhost.cpp create mode 100644 cpp/src/qpid/broker/management/ManagementObjectVhost.h (limited to 'cpp/src/qpid') diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 051c872e77..01f8a04545 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -28,7 +28,7 @@ #include "NullMessageStore.h" #include "RecoveryManagerImpl.h" #include "TopicExchange.h" -#include "ManagementExchange.h" +#include "management/ManagementExchange.h" #include "qpid/log/Statement.h" #include "qpid/Url.h" @@ -125,6 +125,14 @@ Broker::Broker(const Broker::Options& conf) : Exchange::shared_ptr mExchange = exchanges.get (qpid_management); managementAgent->setExchange (mExchange); dynamic_pointer_cast(mExchange)->setManagmentAgent (managementAgent); + + mgmtObject = ManagementObjectBroker::shared_ptr (new ManagementObjectBroker (conf)); + managementAgent->addObject (dynamic_pointer_cast(mgmtObject)); + + // Since there is currently no support for virtual hosts, a management object + // representing the implied single virtual host is added here. + mgmtVhostObject = ManagementObjectVhost::shared_ptr (new ManagementObjectVhost (conf)); + managementAgent->addObject (dynamic_pointer_cast(mgmtVhostObject)); } else QPID_LOG(info, "Management not enabled"); diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 817197a351..5a67fb7212 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -30,7 +30,9 @@ #include "MessageStore.h" #include "QueueRegistry.h" #include "SessionManager.h" -#include "ManagementAgent.h" +#include "management/ManagementAgent.h" +#include "management/ManagementObjectBroker.h" +#include "management/ManagementObjectVhost.h" #include "qpid/Options.h" #include "qpid/Plugin.h" #include "qpid/Url.h" @@ -65,10 +67,10 @@ class Broker : public sys::Runnable, public Plugin::Target int connectionBacklog; std::string store; long stagingThreshold; - string storeDir; - bool storeAsync; - bool enableMgmt; - uint16_t mgmtPubInterval; + string storeDir; + bool storeAsync; + bool enableMgmt; + uint16_t mgmtPubInterval; uint32_t ack; }; @@ -129,6 +131,8 @@ class Broker : public sys::Runnable, public Plugin::Target HandlerUpdaters handlerUpdaters; SessionManager sessionManager; ManagementAgent::shared_ptr managementAgent; + ManagementObjectBroker::shared_ptr mgmtObject; + ManagementObjectVhost::shared_ptr mgmtVhostObject; static MessageStore* createStore(const Options& config); }; diff --git a/cpp/src/qpid/broker/ExchangeRegistry.cpp b/cpp/src/qpid/broker/ExchangeRegistry.cpp index 98e3cc7347..35660cfa0b 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.cpp +++ b/cpp/src/qpid/broker/ExchangeRegistry.cpp @@ -23,7 +23,7 @@ #include "FanOutExchange.h" #include "HeadersExchange.h" #include "TopicExchange.h" -#include "ManagementExchange.h" +#include "management/ManagementExchange.h" #include "qpid/framing/reply_exceptions.h" using namespace qpid::broker; diff --git a/cpp/src/qpid/broker/ManagementAgent.cpp b/cpp/src/qpid/broker/ManagementAgent.cpp deleted file mode 100644 index 2f6a0597f0..0000000000 --- a/cpp/src/qpid/broker/ManagementAgent.cpp +++ /dev/null @@ -1,205 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementAgent.h" -#include "DeliverableMessage.h" -#include "qpid/log/Statement.h" -#include -#include -#include -#include - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) -{ - timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); -} - -void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) -{ - exchange = _exchange; -} - -void ManagementAgent::addObject (ManagementObject::shared_ptr object) -{ - managementObjects.push_back (object); - QPID_LOG(info, "Management Object Added"); -} - -ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} - -void ManagementAgent::Periodic::fire () -{ - agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); - agent.PeriodicProcessing (); -} - -void ManagementAgent::clientAdded (void) -{ - for (ManagementObjectVector::iterator iter = managementObjects.begin (); - iter != managementObjects.end (); - iter++) - { - ManagementObject::shared_ptr object = *iter; - object->setAllChanged (); - object->setSchemaNeeded (); - } -} - -void ManagementAgent::PeriodicProcessing (void) -{ -#define BUFSIZE 65536 -#define THRESHOLD 16384 - char msgChars[BUFSIZE]; - Buffer msgBuffer (msgChars, BUFSIZE); - uint32_t contentSize; - std::list deleteList; - - if (managementObjects.empty ()) - return; - - Message::shared_ptr msg (new Message ()); - - // Build the magic number for the management message. - msgBuffer.putOctet ('A'); - msgBuffer.putOctet ('M'); - msgBuffer.putOctet ('0'); - msgBuffer.putOctet ('1'); - - for (uint32_t idx = 0; idx < managementObjects.size (); idx++) - { - ManagementObject::shared_ptr object = managementObjects[idx]; - - if (object->getSchemaNeeded ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('S'); // opcode = Schema Record - msgBuffer.putOctet (0); // content-class = N/A - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeSchema (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getConfigChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('C'); // content-class = Configuration - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeConfig (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->getInstChanged ()) - { - uint32_t startAvail = msgBuffer.available (); - uint32_t recordLength; - - msgBuffer.putOctet ('C'); // opcode = Content Record - msgBuffer.putOctet ('I'); // content-class = Instrumentation - msgBuffer.putShort (object->getObjectType ()); - msgBuffer.record (); // Record the position of the length field - msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length - - object->writeInstrumentation (msgBuffer); - recordLength = startAvail - msgBuffer.available (); - msgBuffer.restore (true); // Restore pointer to length field - msgBuffer.putLong (recordLength); - msgBuffer.restore (); // Re-restore to get to the end of the buffer - } - - if (object->isDeleted ()) - deleteList.push_back (idx); - - // Temporary protection against buffer overrun. - // This needs to be replaced with frame fragmentation. - if (msgBuffer.available () < THRESHOLD) - break; - } - - msgBuffer.putOctet ('X'); // End-of-message - msgBuffer.putOctet (0); - msgBuffer.putShort (0); - msgBuffer.putLong (8); - - contentSize = BUFSIZE - msgBuffer.available (); - msgBuffer.reset (); - - AMQFrame method (0, MessageTransferBody(ProtocolVersion(), - 0, "qpid.management", 0, 0)); - AMQFrame header (0, AMQHeaderBody()); - AMQFrame content; - - content.setBody(AMQContentBody()); - content.castBody()->decode(msgBuffer, contentSize); - - method.setEof (false); - header.setBof (false); - header.setEof (false); - content.setBof (false); - - msg->getFrames().append(method); - msg->getFrames().append(header); - - MessageProperties* props = msg->getFrames().getHeaders()->get(true); - props->setContentLength(contentSize); - //msg->getFrames().getHeaders()->get(true)->setRoutingKey("mgmt"); - msg->getFrames().append(content); - - DeliverableMessage deliverable (msg); - exchange->route (deliverable, "mgmt", 0); - - // Delete flagged objects - for (std::list::reverse_iterator iter = deleteList.rbegin (); - iter != deleteList.rend (); - iter++) - { - managementObjects.erase (managementObjects.begin () + *iter); - } - deleteList.clear (); -} - -void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, - const string& /*routingKey*/, - const FieldTable* /*args*/) -{ -} - diff --git a/cpp/src/qpid/broker/ManagementAgent.h b/cpp/src/qpid/broker/ManagementAgent.h deleted file mode 100644 index e1d62270db..0000000000 --- a/cpp/src/qpid/broker/ManagementAgent.h +++ /dev/null @@ -1,73 +0,0 @@ -#ifndef _ManagementAgent_ -#define _ManagementAgent_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/Options.h" -#include "Exchange.h" -#include "ManagementObject.h" -#include "Timer.h" -#include - -namespace qpid { -namespace broker { - - -class ManagementAgent -{ - public: - - typedef boost::shared_ptr shared_ptr; - - ManagementAgent (uint16_t interval); - - void setExchange (Exchange::shared_ptr exchange); - void addObject (ManagementObject::shared_ptr object); - void clientAdded (void); - void dispatchCommand (Deliverable& msg, - const string& routingKey, - const FieldTable* args); - - private: - - struct Periodic : public TimerTask - { - ManagementAgent& agent; - - Periodic (ManagementAgent& agent, uint32_t seconds); - ~Periodic () {} - void fire (); - }; - - ManagementObjectVector managementObjects; - Timer timer; - Exchange::shared_ptr exchange; - uint16_t interval; - - void PeriodicProcessing (void); -}; - -}} - - - -#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/broker/ManagementExchange.cpp b/cpp/src/qpid/broker/ManagementExchange.cpp deleted file mode 100644 index 5d829477ba..0000000000 --- a/cpp/src/qpid/broker/ManagementExchange.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementExchange.h" -#include "qpid/log/Statement.h" - -using namespace qpid::broker; -using namespace qpid::framing; -using namespace qpid::sys; - -ManagementExchange::ManagementExchange (const string& _name) : - Exchange (_name), TopicExchange(_name) {} -ManagementExchange::ManagementExchange (const std::string& _name, - bool _durable, - const FieldTable& _args) : - Exchange (_name, _durable, _args), - TopicExchange(_name, _durable, _args) {} - - -bool ManagementExchange::bind (Queue::shared_ptr queue, - const string& routingKey, - const FieldTable* args) -{ - bool result = TopicExchange::bind (queue, routingKey, args); - - // Notify the management agent that a new management client has bound to the - // exchange. - if (result) - managementAgent->clientAdded (); - - return result; -} - -void ManagementExchange::route (Deliverable& msg, - const string& routingKey, - const FieldTable* args) -{ - // Intercept management commands - if (routingKey.length () > 7 && - routingKey.substr (0, 7).compare ("method.") == 0) - { - managementAgent->dispatchCommand (msg, routingKey, args); - return; - } - - TopicExchange::route (msg, routingKey, args); -} - -void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) -{ - managementAgent = agent; -} - - -ManagementExchange::~ManagementExchange() {} - -const std::string ManagementExchange::typeName("management"); - diff --git a/cpp/src/qpid/broker/ManagementExchange.h b/cpp/src/qpid/broker/ManagementExchange.h deleted file mode 100644 index 56c051a7f8..0000000000 --- a/cpp/src/qpid/broker/ManagementExchange.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _ManagementExchange_ -#define _ManagementExchange_ - -#include "TopicExchange.h" -#include "ManagementAgent.h" - -namespace qpid { -namespace broker { - -class ManagementExchange : public virtual TopicExchange -{ - private: - ManagementAgent::shared_ptr managementAgent; - - public: - static const std::string typeName; - - ManagementExchange (const string& name); - ManagementExchange (const string& _name, bool _durable, - const qpid::framing::FieldTable& _args); - - virtual std::string getType() const { return typeName; } - - virtual bool bind (Queue::shared_ptr queue, - const string& routingKey, - const qpid::framing::FieldTable* args); - - virtual void route (Deliverable& msg, - const string& routingKey, - const qpid::framing::FieldTable* args); - - void setManagmentAgent (ManagementAgent::shared_ptr agent); - - virtual ~ManagementExchange(); -}; - - -} -} - -#endif diff --git a/cpp/src/qpid/broker/ManagementObject.cpp b/cpp/src/qpid/broker/ManagementObject.cpp deleted file mode 100644 index c536d96b1b..0000000000 --- a/cpp/src/qpid/broker/ManagementObject.cpp +++ /dev/null @@ -1,54 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -using namespace qpid::framing; -using namespace qpid::broker; -using namespace qpid::sys; - -void ManagementObject::schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig, - bool isIndex) -{ - uint8_t flags = - (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); - - buf.putOctet (flags); - buf.putOctet (typeCode); - buf.putShortString (name); - buf.putShortString (description); -} - -void ManagementObject::schemaListEnd (Buffer& buf) -{ - buf.putOctet (FLAG_END); -} - -void ManagementObject::writeTimestamps (Buffer& buf) -{ - buf.putLongLong (uint64_t (Duration (now ()))); - buf.putLongLong (createTime); - buf.putLongLong (destroyTime); -} diff --git a/cpp/src/qpid/broker/ManagementObject.h b/cpp/src/qpid/broker/ManagementObject.h deleted file mode 100644 index 237f2f3d79..0000000000 --- a/cpp/src/qpid/broker/ManagementObject.h +++ /dev/null @@ -1,117 +0,0 @@ -#ifndef _ManagementObject_ -#define _ManagementObject_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/sys/Time.h" -#include -#include -#include - -namespace qpid { -namespace broker { - -using namespace qpid::framing; -using namespace qpid::sys; - -const uint16_t OBJECT_SYSTEM = 1; -const uint16_t OBJECT_BROKER = 2; -const uint16_t OBJECT_VHOST = 3; -const uint16_t OBJECT_QUEUE = 4; -const uint16_t OBJECT_EXCHANGE = 5; -const uint16_t OBJECT_BINDING = 6; -const uint16_t OBJECT_CLIENT = 7; -const uint16_t OBJECT_SESSION = 8; -const uint16_t OBJECT_DESTINATION = 9; -const uint16_t OBJECT_PRODUCER = 10; -const uint16_t OBJECT_CONSUMER = 11; - - -class ManagementObject -{ - protected: - - uint64_t createTime; - uint64_t destroyTime; - bool configChanged; - bool instChanged; - bool deleted; - - static const uint8_t TYPE_UINT8 = 1; - static const uint8_t TYPE_UINT16 = 2; - static const uint8_t TYPE_UINT32 = 3; - static const uint8_t TYPE_UINT64 = 4; - static const uint8_t TYPE_BOOL = 5; - static const uint8_t TYPE_STRING = 6; - - static const uint8_t FLAG_CONFIG = 0x01; - static const uint8_t FLAG_INDEX = 0x02; - static const uint8_t FLAG_END = 0x80; - - void schemaItem (Buffer& buf, - uint8_t typeCode, - std::string name, - std::string description, - bool isConfig = false, - bool isIndex = false); - void schemaListEnd (Buffer& buf); - void writeTimestamps (Buffer& buf); - - public: - typedef boost::shared_ptr shared_ptr; - - ManagementObject () : destroyTime(0), configChanged(true), - instChanged(true), deleted(false) - { createTime = uint64_t (Duration (now ())); } - virtual ~ManagementObject () {} - - virtual uint16_t getObjectType (void) = 0; - virtual std::string getObjectName (void) = 0; - virtual void writeSchema (Buffer& buf) = 0; - virtual void writeConfig (Buffer& buf) = 0; - virtual void writeInstrumentation (Buffer& buf) = 0; - virtual bool getSchemaNeeded (void) = 0; - virtual void setSchemaNeeded (void) = 0; - - inline bool getConfigChanged (void) { return configChanged; } - inline bool getInstChanged (void) { return instChanged; } - inline void setAllChanged (void) - { - configChanged = true; - instChanged = true; - } - - inline void resourceDestroy (void) { - destroyTime = uint64_t (Duration (now ())); - deleted = true; - } - bool isDeleted (void) { return deleted; } - -}; - - typedef std::vector ManagementObjectVector; - -}} - - - -#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/ManagementObjectQueue.cpp deleted file mode 100644 index d30cda03a4..0000000000 --- a/cpp/src/qpid/broker/ManagementObjectQueue.cpp +++ /dev/null @@ -1,182 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObjectQueue.h" - -using namespace qpid::broker; -using namespace qpid::sys; -using namespace qpid::framing; - -bool ManagementObjectQueue::schemaNeeded = true; - -ManagementObjectQueue::ManagementObjectQueue (std::string& _name, bool _durable, bool _autoDelete) : - name(_name), durable(_durable), autoDelete(_autoDelete) -{ - msgTotalEnqueues = 0; - msgTotalDequeues = 0; - msgTxEnqueues = 0; - msgTxDequeues = 0; - msgPersistEnqueues = 0; - msgPersistDequeues = 0; - - msgDepth = 0; - msgDepthLow = 0; - msgDepthHigh = 0; - - byteTotalEnqueues = 0; - byteTotalDequeues = 0; - byteTxEnqueues = 0; - byteTxDequeues = 0; - bytePersistEnqueues = 0; - bytePersistDequeues = 0; - - byteDepth = 0; - byteDepthLow = 0; - byteDepthHigh = 0; - - enqueueTxStarts = 0; - enqueueTxCommits = 0; - enqueueTxRejects = 0; - dequeueTxStarts = 0; - dequeueTxCommits = 0; - dequeueTxRejects = 0; - - enqueueTxCount = 0; - enqueueTxCountLow = 0; - enqueueTxCountHigh = 0; - - dequeueTxCount = 0; - dequeueTxCountLow = 0; - dequeueTxCountHigh = 0; - - consumers = 0; - consumersLow = 0; - consumersHigh = 0; -} - -ManagementObjectQueue::~ManagementObjectQueue () {} - -void ManagementObjectQueue::writeSchema (Buffer& buf) -{ - schemaNeeded = false; - - schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true); - schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); - schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); - - schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); - schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); - schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); - schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); - schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued"); - schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued"); - schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); - schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); - schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); - schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); - schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); - schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started "); - schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed"); - schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions"); - schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval"); - schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started "); - schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed"); - schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected"); - schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions"); - schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); - schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); - schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); - - schemaListEnd (buf); -} - -void ManagementObjectQueue::writeConfig (Buffer& buf) -{ - configChanged = false; - - writeTimestamps (buf); - buf.putShortString (name); - buf.putOctet (durable ? 1 : 0); - buf.putOctet (autoDelete ? 1 : 0); -} - -void ManagementObjectQueue::writeInstrumentation (Buffer& buf) -{ - instChanged = false; - - writeTimestamps (buf); - buf.putShortString (name); - buf.putLongLong (msgTotalEnqueues); - buf.putLongLong (msgTotalDequeues); - buf.putLongLong (msgTxEnqueues); - buf.putLongLong (msgTxDequeues); - buf.putLongLong (msgPersistEnqueues); - buf.putLongLong (msgPersistDequeues); - buf.putLong (msgDepth); - buf.putLong (msgDepthLow); - buf.putLong (msgDepthHigh); - buf.putLongLong (byteTotalEnqueues); - buf.putLongLong (byteTotalDequeues); - buf.putLongLong (byteTxEnqueues); - buf.putLongLong (byteTxDequeues); - buf.putLongLong (bytePersistEnqueues); - buf.putLongLong (bytePersistDequeues); - buf.putLong (byteDepth); - buf.putLong (byteDepthLow); - buf.putLong (byteDepthHigh); - buf.putLongLong (enqueueTxStarts); - buf.putLongLong (enqueueTxCommits); - buf.putLongLong (enqueueTxRejects); - buf.putLong (enqueueTxCount); - buf.putLong (enqueueTxCountLow); - buf.putLong (enqueueTxCountHigh); - buf.putLongLong (dequeueTxStarts); - buf.putLongLong (dequeueTxCommits); - buf.putLongLong (dequeueTxRejects); - buf.putLong (dequeueTxCount); - buf.putLong (dequeueTxCountLow); - buf.putLong (dequeueTxCountHigh); - buf.putLong (consumers); - buf.putLong (consumersLow); - buf.putLong (consumersHigh); - - msgDepthLow = msgDepth; - msgDepthHigh = msgDepth; - byteDepthLow = byteDepth; - byteDepthHigh = byteDepth; - enqueueTxCountLow = enqueueTxCount; - enqueueTxCountHigh = enqueueTxCount; - dequeueTxCountLow = dequeueTxCount; - dequeueTxCountHigh = dequeueTxCount; - consumersLow = consumers; - consumersHigh = consumers; -} diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h deleted file mode 100644 index cb2d399b76..0000000000 --- a/cpp/src/qpid/broker/ManagementObjectQueue.h +++ /dev/null @@ -1,180 +0,0 @@ -#ifndef _ManagementObjectQueue_ -#define _ManagementObjectQueue_ - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ManagementObject.h" - -namespace qpid { -namespace broker { - -const uint32_t MSG_MASK_TX = 1; // Transactional message -const uint32_t MSG_MASK_PERSIST = 2; // Persistent message - -class ManagementObjectQueue : public ManagementObject -{ - private: - - static bool schemaNeeded; - - std::string objectName; - std::string name; - bool durable; - bool autoDelete; - - uint64_t msgTotalEnqueues; // Total messages enqueued - uint64_t msgTotalDequeues; // Total messages dequeued - uint64_t msgTxEnqueues; // Transactional messages enqueued - uint64_t msgTxDequeues; // Transactional messages dequeued - uint64_t msgPersistEnqueues; // Persistent messages enqueued - uint64_t msgPersistDequeues; // Persistent messages dequeued - - uint32_t msgDepth; // Current size of queue in messages - uint32_t msgDepthLow; // Low-water queue size, this interval - uint32_t msgDepthHigh; // High-water queue size, this interval - - uint64_t byteTotalEnqueues; // Total messages enqueued - uint64_t byteTotalDequeues; // Total messages dequeued - uint64_t byteTxEnqueues; // Transactional messages enqueued - uint64_t byteTxDequeues; // Transactional messages dequeued - uint64_t bytePersistEnqueues; // Persistent messages enqueued - uint64_t bytePersistDequeues; // Persistent messages dequeued - - uint32_t byteDepth; // Current size of queue in bytes - uint32_t byteDepthLow; // Low-water mark this interval - uint32_t byteDepthHigh; // High-water mark this interval - - uint64_t enqueueTxStarts; // Total enqueue transactions started - uint64_t enqueueTxCommits; // Total enqueue transactions committed - uint64_t enqueueTxRejects; // Total enqueue transactions rejected - - uint32_t enqueueTxCount; // Current pending enqueue transactions - uint32_t enqueueTxCountLow; // Low water mark this interval - uint32_t enqueueTxCountHigh; // High water mark this interval - - uint64_t dequeueTxStarts; // Total dequeue transactions started - uint64_t dequeueTxCommits; // Total dequeue transactions committed - uint64_t dequeueTxRejects; // Total dequeue transactions rejected - - uint32_t dequeueTxCount; // Current pending dequeue transactions - uint32_t dequeueTxCountLow; // Low water mark this interval - uint32_t dequeueTxCountHigh; // High water mark this interval - - uint32_t consumers; // Current consumers on queue - uint32_t consumersLow; // Low water mark this interval - uint32_t consumersHigh; // High water mark this interval - - uint16_t getObjectType (void) { return OBJECT_QUEUE; } - std::string getObjectName (void) { return objectName; } - void writeSchema (Buffer& buf); - void writeConfig (Buffer& buf); - void writeInstrumentation (Buffer& buf); - bool getSchemaNeeded (void) { return schemaNeeded; } - void setSchemaNeeded (void) { schemaNeeded = true; } - - inline void adjustQueueHiLo (void){ - if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; - if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; - - if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; - if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; - instChanged = true; - } - - inline void adjustTxHiLo (void){ - if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; - if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; - if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; - if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; - instChanged = true; - } - - inline void adjustConsumerHiLo (void){ - if (consumers > consumersHigh) consumersHigh = consumers; - if (consumers < consumersLow) consumersLow = consumers; - instChanged = true; - } - - public: - - typedef boost::shared_ptr shared_ptr; - - ManagementObjectQueue (std::string& name, bool durable, bool autoDelete); - ~ManagementObjectQueue (void); - - // The following mask contents are used to describe enqueued or dequeued - // messages when counting statistics. - - inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalEnqueues++; - byteTotalEnqueues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxEnqueues++; - byteTxEnqueues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistEnqueues++; - bytePersistEnqueues += bytes; - } - - msgDepth++; - byteDepth += bytes; - adjustQueueHiLo (); - } - - inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ - msgTotalDequeues++; - byteTotalDequeues += bytes; - - if (attrMask & MSG_MASK_TX){ - msgTxDequeues++; - byteTxDequeues += bytes; - } - - if (attrMask & MSG_MASK_PERSIST){ - msgPersistDequeues++; - bytePersistDequeues += bytes; - } - - msgDepth--; - byteDepth -= bytes; - adjustQueueHiLo (); - } - - inline void incConsumers (void){ - consumers++; - adjustConsumerHiLo (); - } - - inline void decConsumers (void){ - consumers--; - adjustConsumerHiLo (); - } -}; - -}} - - - -#endif /*!_ManagementObjectQueue_*/ diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 17f2d8ba91..f247312b60 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -35,7 +35,7 @@ #include "PersistableQueue.h" #include "QueuePolicy.h" #include "QueueBindings.h" -#include "ManagementObjectQueue.h" +#include "management/ManagementObjectQueue.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp index bc572e4238..31eab33fe2 100644 --- a/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/cpp/src/qpid/broker/QueueRegistry.cpp @@ -19,8 +19,8 @@ * */ #include "QueueRegistry.h" -#include "ManagementAgent.h" -#include "ManagementObjectQueue.h" +#include "management/ManagementAgent.h" +#include "management/ManagementObjectQueue.h" #include "qpid/log/Statement.h" #include #include diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h index 03b4778f7a..8dc5539051 100644 --- a/cpp/src/qpid/broker/QueueRegistry.h +++ b/cpp/src/qpid/broker/QueueRegistry.h @@ -24,7 +24,7 @@ #include #include "qpid/sys/Mutex.h" #include "Queue.h" -#include "ManagementAgent.h" +#include "management/ManagementAgent.h" namespace qpid { namespace broker { diff --git a/cpp/src/qpid/broker/management/ManagementAgent.cpp b/cpp/src/qpid/broker/management/ManagementAgent.cpp new file mode 100644 index 0000000000..1121b7bc6b --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementAgent.cpp @@ -0,0 +1,204 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementAgent.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/log/Statement.h" +#include +#include +#include +#include + +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid::sys; + +ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval) +{ + timer.add (TimerTask::shared_ptr (new Periodic(*this, interval))); +} + +void ManagementAgent::setExchange (Exchange::shared_ptr _exchange) +{ + exchange = _exchange; +} + +void ManagementAgent::addObject (ManagementObject::shared_ptr object) +{ + managementObjects.push_back (object); + QPID_LOG(info, "Management Object Added"); +} + +ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) + : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {} + +void ManagementAgent::Periodic::fire () +{ + agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval))); + agent.PeriodicProcessing (); +} + +void ManagementAgent::clientAdded (void) +{ + for (ManagementObjectVector::iterator iter = managementObjects.begin (); + iter != managementObjects.end (); + iter++) + { + ManagementObject::shared_ptr object = *iter; + object->setAllChanged (); + object->setSchemaNeeded (); + } +} + +void ManagementAgent::PeriodicProcessing (void) +{ +#define BUFSIZE 65536 +#define THRESHOLD 16384 + char msgChars[BUFSIZE]; + Buffer msgBuffer (msgChars, BUFSIZE); + uint32_t contentSize; + std::list deleteList; + + if (managementObjects.empty ()) + return; + + Message::shared_ptr msg (new Message ()); + + // Build the magic number for the management message. + msgBuffer.putOctet ('A'); + msgBuffer.putOctet ('M'); + msgBuffer.putOctet ('0'); + msgBuffer.putOctet ('1'); + + for (uint32_t idx = 0; idx < managementObjects.size (); idx++) + { + ManagementObject::shared_ptr object = managementObjects[idx]; + + if (object->getSchemaNeeded ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('S'); // opcode = Schema Record + msgBuffer.putOctet (0); // content-class = N/A + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeSchema (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getConfigChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('C'); // content-class = Configuration + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeConfig (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->getInstChanged ()) + { + uint32_t startAvail = msgBuffer.available (); + uint32_t recordLength; + + msgBuffer.putOctet ('C'); // opcode = Content Record + msgBuffer.putOctet ('I'); // content-class = Instrumentation + msgBuffer.putShort (object->getObjectType ()); + msgBuffer.record (); // Record the position of the length field + msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length + + object->writeInstrumentation (msgBuffer); + recordLength = startAvail - msgBuffer.available (); + msgBuffer.restore (true); // Restore pointer to length field + msgBuffer.putLong (recordLength); + msgBuffer.restore (); // Re-restore to get to the end of the buffer + } + + if (object->isDeleted ()) + deleteList.push_back (idx); + + // Temporary protection against buffer overrun. + // This needs to be replaced with frame fragmentation. + if (msgBuffer.available () < THRESHOLD) + break; + } + + msgBuffer.putOctet ('X'); // End-of-message + msgBuffer.putOctet (0); + msgBuffer.putShort (0); + msgBuffer.putLong (8); + + contentSize = BUFSIZE - msgBuffer.available (); + msgBuffer.reset (); + + AMQFrame method (0, MessageTransferBody(ProtocolVersion(), + 0, "qpid.management", 0, 0)); + AMQFrame header (0, AMQHeaderBody()); + AMQFrame content; + + content.setBody(AMQContentBody()); + content.castBody()->decode(msgBuffer, contentSize); + + method.setEof (false); + header.setBof (false); + header.setEof (false); + content.setBof (false); + + msg->getFrames().append(method); + msg->getFrames().append(header); + + MessageProperties* props = msg->getFrames().getHeaders()->get(true); + props->setContentLength(contentSize); + msg->getFrames().append(content); + + DeliverableMessage deliverable (msg); + exchange->route (deliverable, "mgmt", 0); + + // Delete flagged objects + for (std::list::reverse_iterator iter = deleteList.rbegin (); + iter != deleteList.rend (); + iter++) + { + managementObjects.erase (managementObjects.begin () + *iter); + } + deleteList.clear (); +} + +void ManagementAgent::dispatchCommand (Deliverable& /*msg*/, + const string& /*routingKey*/, + const FieldTable* /*args*/) +{ +} + diff --git a/cpp/src/qpid/broker/management/ManagementAgent.h b/cpp/src/qpid/broker/management/ManagementAgent.h new file mode 100644 index 0000000000..c3cfa58291 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementAgent.h @@ -0,0 +1,73 @@ +#ifndef _ManagementAgent_ +#define _ManagementAgent_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/Options.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Timer.h" +#include "ManagementObject.h" +#include + +namespace qpid { +namespace broker { + + +class ManagementAgent +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementAgent (uint16_t interval); + + void setExchange (Exchange::shared_ptr exchange); + void addObject (ManagementObject::shared_ptr object); + void clientAdded (void); + void dispatchCommand (Deliverable& msg, + const string& routingKey, + const FieldTable* args); + + private: + + struct Periodic : public TimerTask + { + ManagementAgent& agent; + + Periodic (ManagementAgent& agent, uint32_t seconds); + ~Periodic () {} + void fire (); + }; + + ManagementObjectVector managementObjects; + Timer timer; + Exchange::shared_ptr exchange; + uint16_t interval; + + void PeriodicProcessing (void); +}; + +}} + + + +#endif /*!_ManagementAgent_*/ diff --git a/cpp/src/qpid/broker/management/ManagementExchange.cpp b/cpp/src/qpid/broker/management/ManagementExchange.cpp new file mode 100644 index 0000000000..5d829477ba --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementExchange.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementExchange.h" +#include "qpid/log/Statement.h" + +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + +ManagementExchange::ManagementExchange (const string& _name) : + Exchange (_name), TopicExchange(_name) {} +ManagementExchange::ManagementExchange (const std::string& _name, + bool _durable, + const FieldTable& _args) : + Exchange (_name, _durable, _args), + TopicExchange(_name, _durable, _args) {} + + +bool ManagementExchange::bind (Queue::shared_ptr queue, + const string& routingKey, + const FieldTable* args) +{ + bool result = TopicExchange::bind (queue, routingKey, args); + + // Notify the management agent that a new management client has bound to the + // exchange. + if (result) + managementAgent->clientAdded (); + + return result; +} + +void ManagementExchange::route (Deliverable& msg, + const string& routingKey, + const FieldTable* args) +{ + // Intercept management commands + if (routingKey.length () > 7 && + routingKey.substr (0, 7).compare ("method.") == 0) + { + managementAgent->dispatchCommand (msg, routingKey, args); + return; + } + + TopicExchange::route (msg, routingKey, args); +} + +void ManagementExchange::setManagmentAgent (ManagementAgent::shared_ptr agent) +{ + managementAgent = agent; +} + + +ManagementExchange::~ManagementExchange() {} + +const std::string ManagementExchange::typeName("management"); + diff --git a/cpp/src/qpid/broker/management/ManagementExchange.h b/cpp/src/qpid/broker/management/ManagementExchange.h new file mode 100644 index 0000000000..c38f38d0a1 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementExchange.h @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _ManagementExchange_ +#define _ManagementExchange_ + +#include "qpid/broker/TopicExchange.h" +#include "ManagementAgent.h" + +namespace qpid { +namespace broker { + +class ManagementExchange : public virtual TopicExchange +{ + private: + ManagementAgent::shared_ptr managementAgent; + + public: + static const std::string typeName; + + ManagementExchange (const string& name); + ManagementExchange (const string& _name, bool _durable, + const qpid::framing::FieldTable& _args); + + virtual std::string getType() const { return typeName; } + + virtual bool bind (Queue::shared_ptr queue, + const string& routingKey, + const qpid::framing::FieldTable* args); + + virtual void route (Deliverable& msg, + const string& routingKey, + const qpid::framing::FieldTable* args); + + void setManagmentAgent (ManagementAgent::shared_ptr agent); + + virtual ~ManagementExchange(); +}; + + +} +} + +#endif diff --git a/cpp/src/qpid/broker/management/ManagementObject.cpp b/cpp/src/qpid/broker/management/ManagementObject.cpp new file mode 100644 index 0000000000..c536d96b1b --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObject.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +using namespace qpid::framing; +using namespace qpid::broker; +using namespace qpid::sys; + +void ManagementObject::schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig, + bool isIndex) +{ + uint8_t flags = + (isConfig ? FLAG_CONFIG : 0) | (isIndex ? FLAG_INDEX : 0); + + buf.putOctet (flags); + buf.putOctet (typeCode); + buf.putShortString (name); + buf.putShortString (description); +} + +void ManagementObject::schemaListEnd (Buffer& buf) +{ + buf.putOctet (FLAG_END); +} + +void ManagementObject::writeTimestamps (Buffer& buf) +{ + buf.putLongLong (uint64_t (Duration (now ()))); + buf.putLongLong (createTime); + buf.putLongLong (destroyTime); +} diff --git a/cpp/src/qpid/broker/management/ManagementObject.h b/cpp/src/qpid/broker/management/ManagementObject.h new file mode 100644 index 0000000000..107da62e67 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObject.h @@ -0,0 +1,117 @@ +#ifndef _ManagementObject_ +#define _ManagementObject_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/Time.h" +#include +#include +#include + +namespace qpid { +namespace broker { + +using namespace qpid::framing; +using namespace qpid::sys; + +const uint16_t OBJECT_SYSTEM = 1; +const uint16_t OBJECT_BROKER = 2; +const uint16_t OBJECT_VHOST = 3; +const uint16_t OBJECT_QUEUE = 4; +const uint16_t OBJECT_EXCHANGE = 5; +const uint16_t OBJECT_BINDING = 6; +const uint16_t OBJECT_CLIENT = 7; +const uint16_t OBJECT_SESSION = 8; +const uint16_t OBJECT_DESTINATION = 9; +const uint16_t OBJECT_PRODUCER = 10; +const uint16_t OBJECT_CONSUMER = 11; + + +class ManagementObject +{ + protected: + + uint64_t createTime; + uint64_t destroyTime; + bool configChanged; + bool instChanged; + bool deleted; + + static const uint8_t TYPE_UINT8 = 1; + static const uint8_t TYPE_UINT16 = 2; + static const uint8_t TYPE_UINT32 = 3; + static const uint8_t TYPE_UINT64 = 4; + static const uint8_t TYPE_BOOL = 5; + static const uint8_t TYPE_STRING = 6; + + static const uint8_t FLAG_CONFIG = 0x01; + static const uint8_t FLAG_INDEX = 0x02; + static const uint8_t FLAG_END = 0x80; + + void schemaItem (Buffer& buf, + uint8_t typeCode, + std::string name, + std::string description, + bool isConfig = false, + bool isIndex = false); + void schemaListEnd (Buffer& buf); + void writeTimestamps (Buffer& buf); + + public: + typedef boost::shared_ptr shared_ptr; + + ManagementObject () : destroyTime(0), configChanged(true), + instChanged(true), deleted(false) + { createTime = uint64_t (Duration (now ())); } + virtual ~ManagementObject () {} + + virtual uint16_t getObjectType (void) = 0; + virtual std::string getObjectName (void) = 0; + virtual void writeSchema (Buffer& buf) = 0; + virtual void writeConfig (Buffer& buf) = 0; + virtual void writeInstrumentation (Buffer& buf) = 0; + virtual bool getSchemaNeeded (void) = 0; + virtual void setSchemaNeeded (void) = 0; + + inline bool getConfigChanged (void) { return configChanged; } + virtual bool getInstChanged (void) { return instChanged; } + inline void setAllChanged (void) + { + configChanged = true; + instChanged = true; + } + + inline void resourceDestroy (void) { + destroyTime = uint64_t (Duration (now ())); + deleted = true; + } + bool isDeleted (void) { return deleted; } + +}; + + typedef std::vector ManagementObjectVector; + +}} + + + +#endif /*!_ManagementObject_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp new file mode 100644 index 0000000000..378017d5a8 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.cpp @@ -0,0 +1,98 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "config.h" +#include "qpid/broker/Broker.h" +#include "ManagementObjectBroker.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectBroker::schemaNeeded = true; + +ManagementObjectBroker::ManagementObjectBroker (const Options& _conf) +{ + Broker::Options& conf = (Broker::Options&) _conf; + + sysId = "sysId"; + port = conf.port; + workerThreads = conf.workerThreads; + maxConns = conf.maxConnections; + connBacklog = conf.connectionBacklog; + stagingThreshold = conf.stagingThreshold; + storeLib = conf.store; + asyncStore = conf.storeAsync; + mgmtPubInterval = conf.mgmtPubInterval; + initialDiskPageSize = 0; + initialPagesPerQueue = 0; + clusterName = ""; + version = PACKAGE_VERSION; +} + +ManagementObjectBroker::~ManagementObjectBroker () {} + +void ManagementObjectBroker::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "sysId", "System ID", true, true); + schemaItem (buf, TYPE_UINT16, "port", "TCP Port for AMQP Service", true); + schemaItem (buf, TYPE_UINT16, "workerThreads", "Thread pool size", true); + schemaItem (buf, TYPE_UINT16, "maxConns", "Maximum allowed connections", true); + schemaItem (buf, TYPE_UINT16, "connBacklog", + "Connection backlog limit for listening socket", true); + schemaItem (buf, TYPE_UINT32, "stagingThreshold", + "Broker stages messages over this size to disk", true); + schemaItem (buf, TYPE_STRING, "storeLib", "Name of persistent storage library", true); + schemaItem (buf, TYPE_UINT8, "asyncStore", "Use async persistent store", true); + schemaItem (buf, TYPE_UINT16, "mgmtPubInterval", "Interval for management broadcasts", true); + schemaItem (buf, TYPE_UINT32, "initialDiskPageSize", + "Number of disk pages allocated for storage", true); + schemaItem (buf, TYPE_UINT32, "initialPagesPerQueue", + "Number of disk pages allocated per queue", true); + schemaItem (buf, TYPE_STRING, "clusterName", + "Name of cluster this server is a member of, zero-length for standalone server", true); + schemaItem (buf, TYPE_STRING, "version", "Running software version", true); + + schemaListEnd (buf); +} + +void ManagementObjectBroker::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (sysId); + buf.putShort (port); + buf.putShort (workerThreads); + buf.putShort (maxConns); + buf.putShort (connBacklog); + buf.putLong (stagingThreshold); + buf.putShortString (storeLib); + buf.putOctet (asyncStore ? 1 : 0); + buf.putShort (mgmtPubInterval); + buf.putLong (initialDiskPageSize); + buf.putLong (initialPagesPerQueue); + buf.putShortString (clusterName); + buf.putShortString (version); +} + diff --git a/cpp/src/qpid/broker/management/ManagementObjectBroker.h b/cpp/src/qpid/broker/management/ManagementObjectBroker.h new file mode 100644 index 0000000000..f83df061af --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectBroker.h @@ -0,0 +1,75 @@ +#ifndef _ManagementObjectBroker_ +#define _ManagementObjectBroker_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace broker { + +class ManagementObjectBroker : public ManagementObject +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectBroker (const Options& conf); + ~ManagementObjectBroker (void); + + private: + + static bool schemaNeeded; + + std::string objectName; + + std::string sysId; + uint16_t port; + uint16_t workerThreads; + uint16_t maxConns; + uint16_t connBacklog; + uint32_t stagingThreshold; + std::string storeLib; + bool asyncStore; + uint16_t mgmtPubInterval; + uint32_t initialDiskPageSize; + uint32_t initialPagesPerQueue; + std::string clusterName; + std::string version; + + uint16_t getObjectType (void) { return OBJECT_BROKER; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementObjectBroker_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp new file mode 100644 index 0000000000..70913ea910 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObjectQueue.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectQueue::schemaNeeded = true; + +ManagementObjectQueue::ManagementObjectQueue (std::string& _name, + bool _durable, bool _autoDelete) : + vhostName("/"), name(_name), durable(_durable), autoDelete(_autoDelete) +{ + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +ManagementObjectQueue::~ManagementObjectQueue () {} + +void ManagementObjectQueue::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "vhostRef", "Virtual Host Ref", true, true); + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + + schemaListEnd (buf); +} + +void ManagementObjectQueue::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); +} + +void ManagementObjectQueue::writeInstrumentation (Buffer& buf) +{ + instChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; +} diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.h b/cpp/src/qpid/broker/management/ManagementObjectQueue.h new file mode 100644 index 0000000000..4a0608c7d0 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.h @@ -0,0 +1,181 @@ +#ifndef _ManagementObjectQueue_ +#define _ManagementObjectQueue_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" + +namespace qpid { +namespace broker { + +const uint32_t MSG_MASK_TX = 1; // Transactional message +const uint32_t MSG_MASK_PERSIST = 2; // Persistent message + +class ManagementObjectQueue : public ManagementObject +{ + private: + + static bool schemaNeeded; + + std::string objectName; + std::string vhostName; + std::string name; + bool durable; + bool autoDelete; + + uint64_t msgTotalEnqueues; // Total messages enqueued + uint64_t msgTotalDequeues; // Total messages dequeued + uint64_t msgTxEnqueues; // Transactional messages enqueued + uint64_t msgTxDequeues; // Transactional messages dequeued + uint64_t msgPersistEnqueues; // Persistent messages enqueued + uint64_t msgPersistDequeues; // Persistent messages dequeued + + uint32_t msgDepth; // Current size of queue in messages + uint32_t msgDepthLow; // Low-water queue size, this interval + uint32_t msgDepthHigh; // High-water queue size, this interval + + uint64_t byteTotalEnqueues; // Total messages enqueued + uint64_t byteTotalDequeues; // Total messages dequeued + uint64_t byteTxEnqueues; // Transactional messages enqueued + uint64_t byteTxDequeues; // Transactional messages dequeued + uint64_t bytePersistEnqueues; // Persistent messages enqueued + uint64_t bytePersistDequeues; // Persistent messages dequeued + + uint32_t byteDepth; // Current size of queue in bytes + uint32_t byteDepthLow; // Low-water mark this interval + uint32_t byteDepthHigh; // High-water mark this interval + + uint64_t enqueueTxStarts; // Total enqueue transactions started + uint64_t enqueueTxCommits; // Total enqueue transactions committed + uint64_t enqueueTxRejects; // Total enqueue transactions rejected + + uint32_t enqueueTxCount; // Current pending enqueue transactions + uint32_t enqueueTxCountLow; // Low water mark this interval + uint32_t enqueueTxCountHigh; // High water mark this interval + + uint64_t dequeueTxStarts; // Total dequeue transactions started + uint64_t dequeueTxCommits; // Total dequeue transactions committed + uint64_t dequeueTxRejects; // Total dequeue transactions rejected + + uint32_t dequeueTxCount; // Current pending dequeue transactions + uint32_t dequeueTxCountLow; // Low water mark this interval + uint32_t dequeueTxCountHigh; // High water mark this interval + + uint32_t consumers; // Current consumers on queue + uint32_t consumersLow; // Low water mark this interval + uint32_t consumersHigh; // High water mark this interval + + uint16_t getObjectType (void) { return OBJECT_QUEUE; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& buf); + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline void adjustQueueHiLo (void){ + if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth; + if (msgDepth < msgDepthLow) msgDepthLow = msgDepth; + + if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth; + if (byteDepth < byteDepthLow) byteDepthLow = byteDepth; + instChanged = true; + } + + inline void adjustTxHiLo (void){ + if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount; + if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount; + if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount; + if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount; + instChanged = true; + } + + inline void adjustConsumerHiLo (void){ + if (consumers > consumersHigh) consumersHigh = consumers; + if (consumers < consumersLow) consumersLow = consumers; + instChanged = true; + } + + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectQueue (std::string& name, bool durable, bool autoDelete); + ~ManagementObjectQueue (void); + + // The following mask contents are used to describe enqueued or dequeued + // messages when counting statistics. + + inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalEnqueues++; + byteTotalEnqueues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxEnqueues++; + byteTxEnqueues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistEnqueues++; + bytePersistEnqueues += bytes; + } + + msgDepth++; + byteDepth += bytes; + adjustQueueHiLo (); + } + + inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){ + msgTotalDequeues++; + byteTotalDequeues += bytes; + + if (attrMask & MSG_MASK_TX){ + msgTxDequeues++; + byteTxDequeues += bytes; + } + + if (attrMask & MSG_MASK_PERSIST){ + msgPersistDequeues++; + bytePersistDequeues += bytes; + } + + msgDepth--; + byteDepth -= bytes; + adjustQueueHiLo (); + } + + inline void incConsumers (void){ + consumers++; + adjustConsumerHiLo (); + } + + inline void decConsumers (void){ + consumers--; + adjustConsumerHiLo (); + } +}; + +}} + + + +#endif /*!_ManagementObjectQueue_*/ diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp new file mode 100644 index 0000000000..25a2200106 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.cpp @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Broker.h" +#include "ManagementObjectVhost.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectVhost::schemaNeeded = true; + +ManagementObjectVhost::ManagementObjectVhost (const Options& /*_conf*/) +{ + name = "/"; +} + +ManagementObjectVhost::~ManagementObjectVhost () {} + +void ManagementObjectVhost::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "name", "Name of virtual host", true, true); + + schemaListEnd (buf); +} + +void ManagementObjectVhost::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (name); +} + diff --git a/cpp/src/qpid/broker/management/ManagementObjectVhost.h b/cpp/src/qpid/broker/management/ManagementObjectVhost.h new file mode 100644 index 0000000000..77ac4eea38 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectVhost.h @@ -0,0 +1,63 @@ +#ifndef _ManagementObjectVhost_ +#define _ManagementObjectVhost_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObject.h" +#include "qpid/Options.h" +#include "boost/shared_ptr.hpp" + +namespace qpid { +namespace broker { + +class ManagementObjectVhost : public ManagementObject +{ + public: + + typedef boost::shared_ptr shared_ptr; + + ManagementObjectVhost (const Options& conf); + ~ManagementObjectVhost (void); + + private: + + static bool schemaNeeded; + + std::string objectName; + + std::string name; + + uint16_t getObjectType (void) { return OBJECT_VHOST; } + std::string getObjectName (void) { return objectName; } + void writeSchema (Buffer& buf); + void writeConfig (Buffer& buf); + void writeInstrumentation (Buffer& /*buf*/) {} + bool getSchemaNeeded (void) { return schemaNeeded; } + void setSchemaNeeded (void) { schemaNeeded = true; } + + inline bool getInstChanged (void) { return false; } +}; + +}} + + +#endif /*!_ManagementObjectVhost_*/ -- cgit v1.2.1