diff options
Diffstat (limited to 'cpp/src/qpid/broker/management/ManagementObjectQueue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/management/ManagementObjectQueue.cpp | 186 |
1 files changed, 186 insertions, 0 deletions
diff --git a/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp new file mode 100644 index 0000000000..70913ea910 --- /dev/null +++ b/cpp/src/qpid/broker/management/ManagementObjectQueue.cpp @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ManagementObjectQueue.h" + +using namespace qpid::broker; +using namespace qpid::sys; +using namespace qpid::framing; + +bool ManagementObjectQueue::schemaNeeded = true; + +ManagementObjectQueue::ManagementObjectQueue (std::string& _name, + bool _durable, bool _autoDelete) : + vhostName("/"), name(_name), durable(_durable), autoDelete(_autoDelete) +{ + msgTotalEnqueues = 0; + msgTotalDequeues = 0; + msgTxEnqueues = 0; + msgTxDequeues = 0; + msgPersistEnqueues = 0; + msgPersistDequeues = 0; + + msgDepth = 0; + msgDepthLow = 0; + msgDepthHigh = 0; + + byteTotalEnqueues = 0; + byteTotalDequeues = 0; + byteTxEnqueues = 0; + byteTxDequeues = 0; + bytePersistEnqueues = 0; + bytePersistDequeues = 0; + + byteDepth = 0; + byteDepthLow = 0; + byteDepthHigh = 0; + + enqueueTxStarts = 0; + enqueueTxCommits = 0; + enqueueTxRejects = 0; + dequeueTxStarts = 0; + dequeueTxCommits = 0; + dequeueTxRejects = 0; + + enqueueTxCount = 0; + enqueueTxCountLow = 0; + enqueueTxCountHigh = 0; + + dequeueTxCount = 0; + dequeueTxCountLow = 0; + dequeueTxCountHigh = 0; + + consumers = 0; + consumersLow = 0; + consumersHigh = 0; +} + +ManagementObjectQueue::~ManagementObjectQueue () {} + +void ManagementObjectQueue::writeSchema (Buffer& buf) +{ + schemaNeeded = false; + + schemaItem (buf, TYPE_STRING, "vhostRef", "Virtual Host Ref", true, true); + schemaItem (buf, TYPE_STRING, "name", "Queue Name", true, true); + schemaItem (buf, TYPE_BOOL, "durable", "Durable", true); + schemaItem (buf, TYPE_BOOL, "autoDelete", "AutoDelete", true); + + schemaItem (buf, TYPE_UINT64, "msgTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "msgPersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "msgPersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "msgDepth", "Current size of queue in messages"); + schemaItem (buf, TYPE_UINT32, "msgDepthLow", "Low-water queue size, this interval"); + schemaItem (buf, TYPE_UINT32, "msgDepthHigh", "High-water queue size, this interval"); + schemaItem (buf, TYPE_UINT64, "byteTotalEnqueues", "Total messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTotalDequeues", "Total messages dequeued"); + schemaItem (buf, TYPE_UINT64, "byteTxEnqueues", "Transactional messages enqueued"); + schemaItem (buf, TYPE_UINT64, "byteTxDequeues", "Transactional messages dequeued"); + schemaItem (buf, TYPE_UINT64, "bytePersistEnqueues", "Persistent messages enqueued"); + schemaItem (buf, TYPE_UINT64, "bytePersistDequeues", "Persistent messages dequeued"); + schemaItem (buf, TYPE_UINT32, "byteDepth", "Current size of queue in bytes"); + schemaItem (buf, TYPE_UINT32, "byteDepthLow", "Low-water mark this interval"); + schemaItem (buf, TYPE_UINT32, "byteDepthHigh", "High-water mark this interval"); + schemaItem (buf, TYPE_UINT64, "enqueueTxStarts", "Total enqueue transactions started "); + schemaItem (buf, TYPE_UINT64, "enqueueTxCommits", "Total enqueue transactions committed"); + schemaItem (buf, TYPE_UINT64, "enqueueTxRejects", "Total enqueue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCount", "Current pending enqueue transactions"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountLow", "Low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "enqueueTxCountHigh", "High water mark this interval"); + schemaItem (buf, TYPE_UINT64, "dequeueTxStarts", "Total dequeue transactions started "); + schemaItem (buf, TYPE_UINT64, "dequeueTxCommits", "Total dequeue transactions committed"); + schemaItem (buf, TYPE_UINT64, "dequeueTxRejects", "Total dequeue transactions rejected"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCount", "Current pending dequeue transactions"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountLow", "Transaction low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "dequeueTxCountHigh", "Transaction high water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumers", "Current consumers on queue"); + schemaItem (buf, TYPE_UINT32, "consumersLow", "Consumer low water mark this interval"); + schemaItem (buf, TYPE_UINT32, "consumersHigh", "Consumer high water mark this interval"); + + schemaListEnd (buf); +} + +void ManagementObjectQueue::writeConfig (Buffer& buf) +{ + configChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putOctet (durable ? 1 : 0); + buf.putOctet (autoDelete ? 1 : 0); +} + +void ManagementObjectQueue::writeInstrumentation (Buffer& buf) +{ + instChanged = false; + + writeTimestamps (buf); + buf.putShortString (vhostName); + buf.putShortString (name); + buf.putLongLong (msgTotalEnqueues); + buf.putLongLong (msgTotalDequeues); + buf.putLongLong (msgTxEnqueues); + buf.putLongLong (msgTxDequeues); + buf.putLongLong (msgPersistEnqueues); + buf.putLongLong (msgPersistDequeues); + buf.putLong (msgDepth); + buf.putLong (msgDepthLow); + buf.putLong (msgDepthHigh); + buf.putLongLong (byteTotalEnqueues); + buf.putLongLong (byteTotalDequeues); + buf.putLongLong (byteTxEnqueues); + buf.putLongLong (byteTxDequeues); + buf.putLongLong (bytePersistEnqueues); + buf.putLongLong (bytePersistDequeues); + buf.putLong (byteDepth); + buf.putLong (byteDepthLow); + buf.putLong (byteDepthHigh); + buf.putLongLong (enqueueTxStarts); + buf.putLongLong (enqueueTxCommits); + buf.putLongLong (enqueueTxRejects); + buf.putLong (enqueueTxCount); + buf.putLong (enqueueTxCountLow); + buf.putLong (enqueueTxCountHigh); + buf.putLongLong (dequeueTxStarts); + buf.putLongLong (dequeueTxCommits); + buf.putLongLong (dequeueTxRejects); + buf.putLong (dequeueTxCount); + buf.putLong (dequeueTxCountLow); + buf.putLong (dequeueTxCountHigh); + buf.putLong (consumers); + buf.putLong (consumersLow); + buf.putLong (consumersHigh); + + msgDepthLow = msgDepth; + msgDepthHigh = msgDepth; + byteDepthLow = byteDepth; + byteDepthHigh = byteDepth; + enqueueTxCountLow = enqueueTxCount; + enqueueTxCountHigh = enqueueTxCount; + dequeueTxCountLow = dequeueTxCount; + dequeueTxCountHigh = dequeueTxCount; + consumersLow = consumers; + consumersHigh = consumers; +} |