summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2007-10-19 18:57:30 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2007-10-19 18:57:30 +0000
commitb97be677001ec35469d080a98ba88276f2300651 (patch)
tree2d947cee25396e96ad1a49f43d9de82e04f0f96a /cpp/src
parentf25df3a53ca1ef5eec396512fd584823e7f6636d (diff)
downloadqpid-python-b97be677001ec35469d080a98ba88276f2300651.tar.gz
QPID-651 applied patch from Ted
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@586578 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Makefile.am6
-rw-r--r--cpp/src/qpid/broker/Broker.cpp26
-rw-r--r--cpp/src/qpid/broker/Broker.h9
-rw-r--r--cpp/src/qpid/broker/ManagementAgent.cpp187
-rw-r--r--cpp/src/qpid/broker/ManagementAgent.h70
-rw-r--r--cpp/src/qpid/broker/ManagementObject.cpp42
-rw-r--r--cpp/src/qpid/broker/ManagementObject.h92
-rw-r--r--cpp/src/qpid/broker/ManagementObjectQueue.cpp168
-rw-r--r--cpp/src/qpid/broker/ManagementObjectQueue.h179
-rw-r--r--cpp/src/qpid/broker/Queue.cpp23
-rw-r--r--cpp/src/qpid/broker/Queue.h6
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.cpp37
-rw-r--r--cpp/src/qpid/broker/QueueRegistry.h8
-rw-r--r--cpp/src/qpid/framing/Buffer.cpp11
-rw-r--r--cpp/src/qpid/framing/Buffer.h3
15 files changed, 857 insertions, 10 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 7f644a3c8f..784f2db227 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -161,6 +161,9 @@ libqpidbroker_la_SOURCES = \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
qpid/broker/IncomingExecutionContext.cpp \
+ qpid/broker/ManagementAgent.cpp \
+ qpid/broker/ManagementObject.cpp \
+ qpid/broker/ManagementObjectQueue.cpp \
qpid/broker/Message.cpp \
qpid/broker/MessageAdapter.cpp \
qpid/broker/MessageBuilder.cpp \
@@ -254,6 +257,9 @@ nobase_include_HEADERS = \
qpid/broker/HandlerImpl.h \
qpid/broker/HeadersExchange.h \
qpid/broker/IncomingExecutionContext.h \
+ qpid/broker/ManagementAgent.h \
+ qpid/broker/ManagementObject.h \
+ qpid/broker/ManagementObjectQueue.h \
qpid/broker/Message.h \
qpid/broker/MessageAdapter.h \
qpid/broker/MessageBuilder.h \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 84c5703a16..44aeb482de 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -60,8 +60,10 @@ Broker::Options::Options(const std::string& name) :
connectionBacklog(10),
store(),
stagingThreshold(5000000),
- storeDir("/var"),
- storeAsync(false)
+ storeDir("/var"),
+ storeAsync(false),
+ enableMgmt(0),
+ mgmtPubInterval(10)
{
addOptions()
("port,p", optValue(port,"PORT"),
@@ -79,7 +81,11 @@ Broker::Options::Options(const std::string& name) :
("store-directory", optValue(storeDir,"DIR"),
"Store directory location for persistence.")
("store-async", optValue(storeAsync,"yes|no"),
- "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.");
+ "Use async persistence storage - if store supports it, enable AIO 0-DIRECT.")
+ ("mgmt,m", optValue(enableMgmt,"yes|no"),
+ "Enable Management")
+ ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
+ "Management Publish Interval");
}
const std::string empty;
@@ -87,6 +93,7 @@ const std::string amq_direct("amq.direct");
const std::string amq_topic("amq.topic");
const std::string amq_fanout("amq.fanout");
const std::string amq_match("amq.match");
+const std::string qpid_management("qpid.management");
Broker::Broker(const Broker::Options& conf) :
config(conf),
@@ -96,11 +103,24 @@ Broker::Broker(const Broker::Options& conf) :
factory(*this),
dtxManager(store.get())
{
+ if(conf.enableMgmt){
+ managementAgent = ManagementAgent::shared_ptr (new ManagementAgent (conf.mgmtPubInterval));
+ queues.setManagementAgent(managementAgent);
+ }
+
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
exchanges.declare(amq_topic, TopicExchange::typeName);
exchanges.declare(amq_fanout, FanOutExchange::typeName);
exchanges.declare(amq_match, HeadersExchange::typeName);
+
+ if(conf.enableMgmt) {
+ QPID_LOG(info, "Management enabled");
+ exchanges.declare(qpid_management, TopicExchange::typeName);
+ managementAgent->setExchange (exchanges.get (qpid_management));
+ }
+ else
+ QPID_LOG(info, "Management not enabled");
if(store.get()) {
store->init(conf.storeDir, conf.storeAsync);
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index b4b82e8433..2018371624 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -30,6 +30,7 @@
#include "MessageStore.h"
#include "QueueRegistry.h"
#include "SessionManager.h"
+#include "ManagementAgent.h"
#include "qpid/Options.h"
#include "qpid/Plugin.h"
#include "qpid/Url.h"
@@ -64,8 +65,10 @@ class Broker : public sys::Runnable, public Plugin::Target
int connectionBacklog;
std::string store;
long stagingThreshold;
- string storeDir;
- bool storeAsync;
+ string storeDir;
+ bool storeAsync;
+ bool enableMgmt;
+ uint16_t mgmtPubInterval;
};
virtual ~Broker();
@@ -107,6 +110,7 @@ class Broker : public sys::Runnable, public Plugin::Target
DtxManager& getDtxManager() { return dtxManager; }
SessionManager& getSessionManager() { return sessionManager; }
+ ManagementAgent::shared_ptr getManagementAgent() { return managementAgent; }
private:
sys::Acceptor& getAcceptor() const;
@@ -123,6 +127,7 @@ class Broker : public sys::Runnable, public Plugin::Target
DtxManager dtxManager;
HandlerUpdaters handlerUpdaters;
SessionManager sessionManager;
+ ManagementAgent::shared_ptr managementAgent;
static MessageStore* createStore(const Options& config);
};
diff --git a/cpp/src/qpid/broker/ManagementAgent.cpp b/cpp/src/qpid/broker/ManagementAgent.cpp
new file mode 100644
index 0000000000..f2ebe991b5
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementAgent.cpp
@@ -0,0 +1,187 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementAgent.h"
+#include "DeliverableMessage.h"
+#include "qpid/log/Statement.h"
+#include <qpid/broker/Message.h>
+#include <qpid/broker/MessageDelivery.h>
+#include <qpid/framing/AMQFrame.h>
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+using namespace qpid::sys;
+
+ManagementAgent::ManagementAgent (uint16_t _interval) : interval (_interval)
+{
+ timer.add (TimerTask::shared_ptr (new Periodic(*this, interval)));
+}
+
+void ManagementAgent::setExchange (Exchange::shared_ptr exchangePtr)
+{
+ exchange = exchangePtr;
+}
+
+void ManagementAgent::addObject (ManagementObject::shared_ptr object)
+{
+ managementObjects.push_back (object);
+ QPID_LOG(info, "Management Object Added");
+}
+
+void ManagementAgent::deleteObject (ManagementObject::shared_ptr object)
+{
+ managementObjects.remove (object);
+ QPID_LOG (debug, "Management Object Removed");
+}
+
+ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
+ : TimerTask (qpid::sys::Duration (_seconds * qpid::sys::TIME_SEC)), agent(_agent) {}
+
+void ManagementAgent::Periodic::fire ()
+{
+ agent.timer.add (TimerTask::shared_ptr (new Periodic (agent, agent.interval)));
+ agent.PeriodicProcessing ();
+}
+
+void ManagementAgent::PeriodicProcessing (void)
+{
+#define BUFSIZE 65536
+#define THRESHOLD 16384
+ char msgChars[BUFSIZE];
+ Buffer msgBuffer (msgChars, BUFSIZE);
+ uint32_t contentSize;
+
+ //QPID_LOG (debug, "Timer Fired");
+ if (managementObjects.empty ())
+ return;
+
+ Message::shared_ptr msg (new Message ());
+
+ // Build the magic number for the management message.
+ msgBuffer.putOctet ('A');
+ msgBuffer.putOctet ('M');
+ msgBuffer.putOctet ('0');
+ msgBuffer.putOctet ('1');
+
+ for (ManagementObjectList::iterator iter = managementObjects.begin ();
+ iter != managementObjects.end ();
+ iter++)
+ {
+ ManagementObject::shared_ptr objectPtr = *iter;
+
+ //QPID_LOG (debug, " Object Found...");
+
+ if (objectPtr->getSchemaNeeded ())
+ {
+ //QPID_LOG (debug, " Generating Schema");
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('S'); // opcode = Schema Record
+ msgBuffer.putOctet (0); // content-class = N/A
+ msgBuffer.putShort (objectPtr->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ objectPtr->writeSchema (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (objectPtr->getConfigChanged ())
+ {
+ //QPID_LOG (debug, " Generating Config");
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('C'); // content-class = Configuration
+ msgBuffer.putShort (objectPtr->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ objectPtr->writeConfig (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ if (objectPtr->getInstChanged ())
+ {
+ //QPID_LOG (debug, " Generating Instrumentation");
+ uint32_t startAvail = msgBuffer.available ();
+ uint32_t recordLength;
+
+ msgBuffer.putOctet ('C'); // opcode = Content Record
+ msgBuffer.putOctet ('I'); // content-class = Instrumentation
+ msgBuffer.putShort (objectPtr->getObjectType ());
+ msgBuffer.record (); // Record the position of the length field
+ msgBuffer.putLong (0xFFFFFFFF); // Placeholder for length
+
+ objectPtr->writeInstrumentation (msgBuffer);
+ recordLength = startAvail - msgBuffer.available ();
+ msgBuffer.restore (true); // Restore pointer to length field
+ msgBuffer.putLong (recordLength);
+ msgBuffer.restore (); // Re-restore to get to the end of the buffer
+ }
+
+ // Temporary protection against buffer overrun.
+ // This needs to be replaced with frame fragmentation.
+ if (msgBuffer.available () < THRESHOLD)
+ break;
+ }
+
+ msgBuffer.putOctet ('X'); // End-of-message
+ msgBuffer.putOctet (0);
+ msgBuffer.putShort (0);
+ msgBuffer.putLong (8);
+
+ contentSize = BUFSIZE - msgBuffer.available ();
+ msgBuffer.reset ();
+
+ AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
+ 0, "qpid.management", 0, 0));
+ AMQFrame header (0, AMQHeaderBody());
+ AMQFrame content;
+
+ content.setBody(AMQContentBody());
+ content.castBody<AMQContentBody>()->decode(msgBuffer, contentSize);
+
+ method.setEof (false);
+ header.setBof (false);
+ header.setEof (false);
+ content.setBof (false);
+
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+
+ MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true);
+ props->setContentLength(contentSize);
+ //msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey("mgmt");
+ msg->getFrames().append(content);
+
+ DeliverableMessage deliverable (msg);
+ exchange->route (deliverable, "mgmt", 0);
+}
+
diff --git a/cpp/src/qpid/broker/ManagementAgent.h b/cpp/src/qpid/broker/ManagementAgent.h
new file mode 100644
index 0000000000..4f3b0a0f5f
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementAgent.h
@@ -0,0 +1,70 @@
+#ifndef _ManagementAgent_
+#define _ManagementAgent_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/Options.h"
+#include "Exchange.h"
+#include "ManagementObject.h"
+#include "Timer.h"
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+
+class ManagementAgent
+{
+ public:
+
+ typedef boost::shared_ptr<ManagementAgent> shared_ptr;
+
+ ManagementAgent(uint16_t interval);
+
+ void setExchange (Exchange::shared_ptr exchangePtr);
+ void addObject (ManagementObject::shared_ptr object);
+ void deleteObject (ManagementObject::shared_ptr object);
+
+ private:
+
+ struct Periodic : public TimerTask
+ {
+ ManagementAgent& agent;
+
+ Periodic (ManagementAgent& agent, uint32_t seconds);
+ ~Periodic () {}
+ void fire ();
+ };
+
+ ManagementObjectList managementObjects;
+ Timer timer;
+ Exchange::shared_ptr exchange;
+ uint16_t interval;
+
+ void PeriodicProcessing (void);
+};
+
+}}
+
+
+
+#endif /*!_ManagementAgent_*/
diff --git a/cpp/src/qpid/broker/ManagementObject.cpp b/cpp/src/qpid/broker/ManagementObject.cpp
new file mode 100644
index 0000000000..75913ba3ab
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementObject.cpp
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+using namespace qpid::framing;
+using namespace qpid::broker;
+
+void ManagementObject::schemaItem (Buffer& buf,
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig)
+{
+ buf.putOctet (isConfig ? 1 : 0);
+ buf.putOctet (typeCode);
+ buf.putShortString (name);
+ buf.putShortString (description);
+}
+
+void ManagementObject::schemaListEnd (Buffer& buf)
+{
+ buf.putOctet (0xFF);
+}
diff --git a/cpp/src/qpid/broker/ManagementObject.h b/cpp/src/qpid/broker/ManagementObject.h
new file mode 100644
index 0000000000..1588aed641
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementObject.h
@@ -0,0 +1,92 @@
+#ifndef _ManagementObject_
+#define _ManagementObject_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <qpid/framing/Buffer.h>
+#include <boost/shared_ptr.hpp>
+#include <list>
+
+namespace qpid {
+namespace broker {
+
+using namespace qpid::framing;
+
+const uint16_t OBJECT_BROKER = 1;
+const uint16_t OBJECT_SERVER = 2;
+const uint16_t OBJECT_QUEUE = 3;
+const uint16_t OBJECT_EXCHANGE = 4;
+const uint16_t OBJECT_BINDING = 5;
+
+class ManagementObject
+{
+ private:
+
+ qpid::sys::AbsTime createTime;
+ qpid::sys::AbsTime destroyTime;
+
+ protected:
+
+ bool configChanged;
+ bool instChanged;
+
+ static const uint8_t TYPE_UINT8 = 1;
+ static const uint8_t TYPE_UINT16 = 2;
+ static const uint8_t TYPE_UINT32 = 3;
+ static const uint8_t TYPE_UINT64 = 4;
+ static const uint8_t TYPE_BOOL = 5;
+ static const uint8_t TYPE_STRING = 6;
+
+ void schemaItem (Buffer& buf,
+ uint8_t typeCode,
+ std::string name,
+ std::string description,
+ bool isConfig = false);
+ void schemaListEnd (Buffer & buf);
+
+ public:
+ typedef boost::shared_ptr<ManagementObject> shared_ptr;
+
+ ManagementObject () : configChanged(true), instChanged(true) { createTime = qpid::sys::now (); }
+ virtual ~ManagementObject () {}
+
+ virtual uint16_t getObjectType (void) = 0;
+ virtual std::string getObjectName (void) = 0;
+ virtual void writeSchema (Buffer& buf) = 0;
+ virtual void writeConfig (Buffer& buf) = 0;
+ virtual void writeInstrumentation (Buffer& buf) = 0;
+ virtual bool getSchemaNeeded (void) = 0;
+
+ inline bool getConfigChanged (void) { return configChanged; }
+ inline bool getInstChanged (void) { return instChanged; }
+ inline void resourceDestroy (void) { destroyTime = qpid::sys::now (); }
+
+};
+
+ typedef std::list<ManagementObject::shared_ptr> ManagementObjectList;
+
+}}
+
+
+
+#endif /*!_ManagementObject_*/
diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/ManagementObjectQueue.cpp
new file mode 100644
index 0000000000..b81cd7b60d
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementObjectQueue.cpp
@@ -0,0 +1,168 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObjectQueue.h"
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using namespace qpid::framing;
+
+bool ManagementObjectQueue::schemaNeeded = true;
+
+ManagementObjectQueue::ManagementObjectQueue (std::string& _name, bool _durable, bool _autoDelete) :
+ name(_name), durable(_durable), autoDelete(_autoDelete)
+{
+ msgTotalEnqueues = 0;
+ msgTotalDequeues = 0;
+ msgTxEnqueues = 0;
+ msgTxDequeues = 0;
+ msgPersistEnqueues = 0;
+ msgPersistDequeues = 0;
+
+ msgDepth = 0;
+ msgDepthLow = 0;
+ msgDepthHigh = 0;
+
+ byteTotalEnqueues = 0;
+ byteTotalDequeues = 0;
+ byteTxEnqueues = 0;
+ byteTxDequeues = 0;
+ bytePersistEnqueues = 0;
+ bytePersistDequeues = 0;
+
+ byteDepth = 0;
+ byteDepthLow = 0;
+ byteDepthHigh = 0;
+
+ enqueueTxStarts = 0;
+ enqueueTxCommits = 0;
+ enqueueTxRejects = 0;
+ dequeueTxStarts = 0;
+ dequeueTxCommits = 0;
+ dequeueTxRejects = 0;
+
+ enqueueTxCount = 0;
+ enqueueTxCountLow = 0;
+ enqueueTxCountHigh = 0;
+
+ dequeueTxCount = 0;
+ dequeueTxCountLow = 0;
+ dequeueTxCountHigh = 0;
+
+ consumers = 0;
+ consumersLow = 0;
+ consumersHigh = 0;
+}
+
+ManagementObjectQueue::~ManagementObjectQueue () {}
+
+void ManagementObjectQueue::writeSchema (Buffer& buf)
+{
+ schemaItem (buf, TYPE_STRING, "name", "Queue Name", true);
+ schemaItem (buf, TYPE_BOOL, "durable", "Durable", true);
+ schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true);
+
+ schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued");
+ schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages");
+ schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval");
+ schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval");
+ schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued");
+ schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued");
+ schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued");
+ schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes");
+ schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started ");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed");
+ schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started ");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed");
+ schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue");
+ schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval");
+ schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval");
+
+ schemaListEnd (buf);
+
+ schemaNeeded = false;
+}
+
+void ManagementObjectQueue::writeConfig (Buffer& buf)
+{
+ buf.putShortString (name);
+ buf.putOctet (durable ? 1 : 0);
+ buf.putOctet (autoDelete ? 1 : 0);
+
+ configChanged = false;
+}
+
+void ManagementObjectQueue::writeInstrumentation (Buffer& buf)
+{
+ buf.putLongLong (msgTotalEnqueues);
+ buf.putLongLong (msgTotalDequeues);
+ buf.putLongLong (msgTxEnqueues);
+ buf.putLongLong (msgTxDequeues);
+ buf.putLongLong (msgPersistEnqueues);
+ buf.putLongLong (msgPersistDequeues);
+ buf.putLong (msgDepth);
+ buf.putLong (msgDepthLow);
+ buf.putLong (msgDepthHigh);
+ buf.putLongLong (byteTotalEnqueues);
+ buf.putLongLong (byteTotalDequeues);
+ buf.putLongLong (byteTxEnqueues);
+ buf.putLongLong (byteTxDequeues);
+ buf.putLongLong (bytePersistEnqueues);
+ buf.putLongLong (bytePersistDequeues);
+ buf.putLong (byteDepth);
+ buf.putLong (byteDepthLow);
+ buf.putLong (byteDepthHigh);
+ buf.putLongLong (enqueueTxStarts);
+ buf.putLongLong (enqueueTxCommits);
+ buf.putLongLong (enqueueTxRejects);
+ buf.putLong (enqueueTxCount);
+ buf.putLong (enqueueTxCountLow);
+ buf.putLong (enqueueTxCountHigh);
+ buf.putLongLong (dequeueTxStarts);
+ buf.putLongLong (dequeueTxCommits);
+ buf.putLongLong (dequeueTxRejects);
+ buf.putLong (dequeueTxCount);
+ buf.putLong (dequeueTxCountLow);
+ buf.putLong (dequeueTxCountHigh);
+ buf.putLong (consumers);
+ buf.putLong (consumersLow);
+ buf.putLong (consumersHigh);
+
+ instChanged = false;
+}
diff --git a/cpp/src/qpid/broker/ManagementObjectQueue.h b/cpp/src/qpid/broker/ManagementObjectQueue.h
new file mode 100644
index 0000000000..989d10f8c0
--- /dev/null
+++ b/cpp/src/qpid/broker/ManagementObjectQueue.h
@@ -0,0 +1,179 @@
+#ifndef _ManagementObjectQueue_
+#define _ManagementObjectQueue_
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "ManagementObject.h"
+
+namespace qpid {
+namespace broker {
+
+const uint32_t MSG_MASK_TX = 1; // Transactional message
+const uint32_t MSG_MASK_PERSIST = 2; // Persistent message
+
+class ManagementObjectQueue : public ManagementObject
+{
+ private:
+
+ static bool schemaNeeded;
+
+ std::string objectName;
+ std::string name;
+ bool durable;
+ bool autoDelete;
+
+ uint64_t msgTotalEnqueues; // Total messages enqueued
+ uint64_t msgTotalDequeues; // Total messages dequeued
+ uint64_t msgTxEnqueues; // Transactional messages enqueued
+ uint64_t msgTxDequeues; // Transactional messages dequeued
+ uint64_t msgPersistEnqueues; // Persistent messages enqueued
+ uint64_t msgPersistDequeues; // Persistent messages dequeued
+
+ uint32_t msgDepth; // Current size of queue in messages
+ uint32_t msgDepthLow; // Low-water queue size, this interval
+ uint32_t msgDepthHigh; // High-water queue size, this interval
+
+ uint64_t byteTotalEnqueues; // Total messages enqueued
+ uint64_t byteTotalDequeues; // Total messages dequeued
+ uint64_t byteTxEnqueues; // Transactional messages enqueued
+ uint64_t byteTxDequeues; // Transactional messages dequeued
+ uint64_t bytePersistEnqueues; // Persistent messages enqueued
+ uint64_t bytePersistDequeues; // Persistent messages dequeued
+
+ uint32_t byteDepth; // Current size of queue in bytes
+ uint32_t byteDepthLow; // Low-water mark this interval
+ uint32_t byteDepthHigh; // High-water mark this interval
+
+ uint64_t enqueueTxStarts; // Total enqueue transactions started
+ uint64_t enqueueTxCommits; // Total enqueue transactions committed
+ uint64_t enqueueTxRejects; // Total enqueue transactions rejected
+
+ uint32_t enqueueTxCount; // Current pending enqueue transactions
+ uint32_t enqueueTxCountLow; // Low water mark this interval
+ uint32_t enqueueTxCountHigh; // High water mark this interval
+
+ uint64_t dequeueTxStarts; // Total dequeue transactions started
+ uint64_t dequeueTxCommits; // Total dequeue transactions committed
+ uint64_t dequeueTxRejects; // Total dequeue transactions rejected
+
+ uint32_t dequeueTxCount; // Current pending dequeue transactions
+ uint32_t dequeueTxCountLow; // Low water mark this interval
+ uint32_t dequeueTxCountHigh; // High water mark this interval
+
+ uint32_t consumers; // Current consumers on queue
+ uint32_t consumersLow; // Low water mark this interval
+ uint32_t consumersHigh; // High water mark this interval
+
+ uint16_t getObjectType (void) { return OBJECT_QUEUE; }
+ std::string getObjectName (void) { return objectName; }
+ void writeSchema (Buffer& buf);
+ void writeConfig (Buffer& buf);
+ void writeInstrumentation (Buffer& buf);
+ bool getSchemaNeeded (void) { return schemaNeeded; }
+
+ inline void adjustQueueHiLo (void){
+ if (msgDepth > msgDepthHigh) msgDepthHigh = msgDepth;
+ if (msgDepth < msgDepthLow) msgDepthLow = msgDepth;
+
+ if (byteDepth > byteDepthHigh) byteDepthHigh = byteDepth;
+ if (byteDepth < byteDepthLow) byteDepthLow = byteDepth;
+ instChanged = true;
+ }
+
+ inline void adjustTxHiLo (void){
+ if (enqueueTxCount > enqueueTxCountHigh) enqueueTxCountHigh = enqueueTxCount;
+ if (enqueueTxCount < enqueueTxCountLow) enqueueTxCountLow = enqueueTxCount;
+ if (dequeueTxCount > dequeueTxCountHigh) dequeueTxCountHigh = dequeueTxCount;
+ if (dequeueTxCount < dequeueTxCountLow) dequeueTxCountLow = dequeueTxCount;
+ instChanged = true;
+ }
+
+ inline void adjustConsumerHiLo (void){
+ if (consumers > consumersHigh) consumersHigh = consumers;
+ if (consumers < consumersLow) consumersLow = consumers;
+ instChanged = true;
+ }
+
+ public:
+
+ typedef boost::shared_ptr<ManagementObjectQueue> shared_ptr;
+
+ ManagementObjectQueue (std::string& name, bool durable, bool autoDelete);
+ ~ManagementObjectQueue (void);
+
+ // The following mask contents are used to describe enqueued or dequeued
+ // messages when counting statistics.
+
+ inline void enqueue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalEnqueues++;
+ byteTotalEnqueues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxEnqueues++;
+ byteTxEnqueues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistEnqueues++;
+ bytePersistEnqueues += bytes;
+ }
+
+ msgDepth++;
+ byteDepth += bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void dequeue (uint64_t bytes, uint32_t attrMask = 0){
+ msgTotalDequeues++;
+ byteTotalDequeues += bytes;
+
+ if (attrMask & MSG_MASK_TX){
+ msgTxDequeues++;
+ byteTxDequeues += bytes;
+ }
+
+ if (attrMask & MSG_MASK_PERSIST){
+ msgPersistDequeues++;
+ bytePersistDequeues += bytes;
+ }
+
+ msgDepth--;
+ byteDepth -= bytes;
+ adjustQueueHiLo ();
+ }
+
+ inline void incConsumers (void){
+ consumers++;
+ adjustConsumerHiLo ();
+ }
+
+ inline void decConsumers (void){
+ consumers--;
+ adjustConsumerHiLo ();
+ }
+};
+
+}}
+
+
+
+#endif /*!_ManagementObjectQueue_*/
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index af248b8fae..456e055c74 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -77,7 +77,11 @@ void Queue::deliver(Message::shared_ptr& msg){
if (!enqueue(0, msg)){
push(msg);
msg->enqueueComplete();
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize ());
}else {
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
push(msg);
}
QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
@@ -89,6 +93,8 @@ void Queue::deliver(Message::shared_ptr& msg){
void Queue::recover(Message::shared_ptr& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), MSG_MASK_PERSIST);
if (store && !msg->isContentLoaded()) {
//content has not been loaded, need to ensure that lazy loading mode is set:
//TODO: find a nicer way to do this
@@ -97,8 +103,15 @@ void Queue::recover(Message::shared_ptr& msg){
}
void Queue::process(Message::shared_ptr& msg){
-
+
+ uint32_t mask = MSG_MASK_TX;
+
+ if (msg->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
push(msg);
+ if (mgmtObjectPtr != 0)
+ mgmtObjectPtr->enqueue (msg->contentSize (), mask);
serializer.execute(dispatchCallback);
}
@@ -267,6 +280,14 @@ QueuedMessage Queue::dequeue(){
if(!messages.empty()){
msg = messages.front();
pop();
+ if (mgmtObjectPtr != 0){
+ uint32_t mask = 0;
+
+ if (msg.payload->isPersistent ())
+ mask |= MSG_MASK_PERSIST;
+
+ mgmtObjectPtr->dequeue (msg.payload->contentSize (), mask);
+ }
}
return msg;
}
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 082ccce246..24a9959d14 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -35,6 +35,7 @@
#include "PersistableQueue.h"
#include "QueuePolicy.h"
#include "QueueBindings.h"
+#include "ManagementObjectQueue.h"
namespace qpid {
namespace broker {
@@ -93,6 +94,7 @@ namespace qpid {
qpid::sys::Serializer<DispatchFunctor> serializer;
DispatchFunctor dispatchCallback;
framing::SequenceNumber sequence;
+ ManagementObjectQueue::shared_ptr mgmtObjectPtr;
void pop();
void push(Message::shared_ptr& msg);
@@ -130,6 +132,8 @@ namespace qpid {
void destroy();
void bound(const string& exchange, const string& key, const qpid::framing::FieldTable& args);
void unbind(ExchangeRegistry& exchanges, Queue::shared_ptr shared_ref);
+ void setMgmt (ManagementObjectQueue::shared_ptr mgmt) { mgmtObjectPtr = mgmt; }
+ ManagementObjectQueue::shared_ptr getMgmt (void) { return mgmtObjectPtr; }
bool acquire(const QueuedMessage& msg);
@@ -158,7 +162,7 @@ namespace qpid {
* Request dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, so this call schedules the despatch based on
- * the serilizer policy.
+ * the serilizer policy.
*/
void requestDispatch(Consumer::ptr c = Consumer::ptr());
void flush(DispatchCompletion& callback);
diff --git a/cpp/src/qpid/broker/QueueRegistry.cpp b/cpp/src/qpid/broker/QueueRegistry.cpp
index ef1fb982e1..927de4c079 100644
--- a/cpp/src/qpid/broker/QueueRegistry.cpp
+++ b/cpp/src/qpid/broker/QueueRegistry.cpp
@@ -19,13 +19,16 @@
*
*/
#include "QueueRegistry.h"
+#include "ManagementAgent.h"
+#include "ManagementObjectQueue.h"
#include <sstream>
#include <assert.h>
using namespace qpid::broker;
using namespace qpid::sys;
-QueueRegistry::QueueRegistry(MessageStore* const _store) : counter(1), store(_store){}
+QueueRegistry::QueueRegistry(MessageStore* const _store) :
+ counter(1), store(_store) {}
QueueRegistry::~QueueRegistry(){}
@@ -37,9 +40,18 @@ QueueRegistry::declare(const string& declareName, bool durable,
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
+
if (i == queues.end()) {
Queue::shared_ptr queue(new Queue(name, autoDelete, durable ? store : 0, owner));
queues[name] = queue;
+
+ if (managementAgent){
+ ManagementObjectQueue::shared_ptr mgmtObject(new ManagementObjectQueue (name, durable, autoDelete));
+
+ queue->setMgmt (mgmtObject);
+ managementAgent->addObject(dynamic_pointer_cast<ManagementObject>(mgmtObject));
+ }
+
return std::pair<Queue::shared_ptr, bool>(queue, true);
} else {
return std::pair<Queue::shared_ptr, bool>(i->second, false);
@@ -48,12 +60,24 @@ QueueRegistry::declare(const string& declareName, bool durable,
void QueueRegistry::destroy(const string& name){
RWlock::ScopedWlock locker(lock);
+
+ if (managementAgent){
+ ManagementObjectQueue::shared_ptr mgmtObject;
+ QueueMap::iterator i = queues.find(name);
+
+ if (i != queues.end()){
+ mgmtObject = i->second->getMgmt ();
+ managementAgent->deleteObject (dynamic_pointer_cast<ManagementObject>(mgmtObject));
+ }
+ }
+
queues.erase(name);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
RWlock::ScopedRlock locker(lock);
QueueMap::iterator i = queues.find(name);
+
if (i == queues.end()) {
return Queue::shared_ptr();
} else {
@@ -76,3 +100,14 @@ string QueueRegistry::generateName(){
MessageStore* const QueueRegistry::getStore() const {
return store;
}
+
+void QueueRegistry::setManagementAgent (ManagementAgent::shared_ptr agent)
+{
+ managementAgent = agent;
+}
+
+ManagementAgent::shared_ptr QueueRegistry::getManagementAgent (void)
+{
+ return managementAgent;
+}
+
diff --git a/cpp/src/qpid/broker/QueueRegistry.h b/cpp/src/qpid/broker/QueueRegistry.h
index f73f467945..07669bb3a1 100644
--- a/cpp/src/qpid/broker/QueueRegistry.h
+++ b/cpp/src/qpid/broker/QueueRegistry.h
@@ -24,6 +24,7 @@
#include <map>
#include "qpid/sys/Mutex.h"
#include "Queue.h"
+#include "ManagementAgent.h"
namespace qpid {
namespace broker {
@@ -87,6 +88,12 @@ class QueueRegistry{
* Return the message store used.
*/
MessageStore* const getStore() const;
+
+ /**
+ * Set/Get the ManagementAgent in use.
+ */
+ void setManagementAgent (ManagementAgent::shared_ptr agent);
+ ManagementAgent::shared_ptr getManagementAgent (void);
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
@@ -94,6 +101,7 @@ private:
qpid::sys::RWlock lock;
int counter;
MessageStore* const store;
+ ManagementAgent::shared_ptr managementAgent;
};
diff --git a/cpp/src/qpid/framing/Buffer.cpp b/cpp/src/qpid/framing/Buffer.cpp
index 758d0b5f3c..eaa4433b5f 100644
--- a/cpp/src/qpid/framing/Buffer.cpp
+++ b/cpp/src/qpid/framing/Buffer.cpp
@@ -34,8 +34,17 @@ void Buffer::record(){
r_position = position;
}
-void Buffer::restore(){
+void Buffer::restore(bool reRecord){
+ uint32_t savedPosition = position;
+
position = r_position;
+
+ if (reRecord)
+ r_position = savedPosition;
+}
+
+void Buffer::reset(){
+ position = 0;
}
uint32_t Buffer::available(){
diff --git a/cpp/src/qpid/framing/Buffer.h b/cpp/src/qpid/framing/Buffer.h
index 3b2e611881..190f736f46 100644
--- a/cpp/src/qpid/framing/Buffer.h
+++ b/cpp/src/qpid/framing/Buffer.h
@@ -41,7 +41,8 @@ public:
Buffer(char* data, uint32_t size);
void record();
- void restore();
+ void restore(bool reRecord = false);
+ void reset();
uint32_t available();
void putOctet(uint8_t i);