diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 45 |
1 files changed, 34 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d1f1afd61a..449488435d 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message& msg, _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg.getContentSize(); + uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalEnqueues +=1; bStats->msgTotalEnqueues += 1; qStats->byteTotalEnqueues += contentSize; @@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message& msg, if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); _qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics(); - uint64_t contentSize = msg.getContentSize(); + uint64_t contentSize = msg.getMessageSize(); qStats->msgTotalDequeues += 1; bStats->msgTotalDequeues += 1; @@ -131,7 +131,15 @@ inline void mgntDeqStats(const Message& msg, QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions) { QueueSettings settings(inputs); - if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) { + settings.maxDepth = QueueDepth(); + if (inputs.maxDepth.hasCount() && inputs.maxDepth.getCount()) { + settings.maxDepth.setCount(inputs.maxDepth.getCount()); + } + if (inputs.maxDepth.hasSize()) { + if (inputs.maxDepth.getSize()) { + settings.maxDepth.setSize(inputs.maxDepth.getSize()); + } + } else if (globalOptions.queueLimit) { settings.maxDepth.setSize(globalOptions.queueLimit); } return settings; @@ -194,7 +202,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings, redirectSource(false) { current.setCount(0);//always track depth in messages - if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it + if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it if (settings.traceExcludes.size()) { split(traceExclude, settings.traceExcludes, ", "); } @@ -297,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuffer* txn) void Queue::recoverPrepared(const Message& msg) { Mutex::ScopedLock locker(messageLock); - current += QueueDepth(1, msg.getContentSize()); + current += QueueDepth(1, msg.getMessageSize()); } void Queue::recover(Message& msg) @@ -311,7 +319,7 @@ void Queue::process(Message& msg) push(msg); if (mgmtObject != 0){ _qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics(); - const uint64_t contentSize = msg.getContentSize(); + const uint64_t contentSize = msg.getMessageSize(); qStats->msgTxnEnqueues += 1; qStats->byteTxnEnqueues += contentSize; mgmtObject->statisticsUpdated(); @@ -853,7 +861,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg) { Mutex::ScopedLock locker(messageLock); - if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) { + if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) { return false; } } @@ -883,7 +891,7 @@ void Queue::enqueueAborted(const Message& msg) //Called when any transactional enqueue is aborted (including but //not limited to a recovered dtx transaction) Mutex::ScopedLock locker(messageLock); - current -= QueueDepth(1, msg.getContentSize()); + current -= QueueDepth(1, msg.getMessageSize()); } void Queue::enqueueCommited(Message& msg) @@ -911,7 +919,7 @@ void Queue::dequeueCommited(const Message& msg) observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); - mgmtObject->inc_byteTxnDequeues(msg.getContentSize()); + mgmtObject->inc_byteTxnDequeues(msg.getMessageSize()); } } @@ -954,7 +962,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) Mutex::ScopedLock locker(messageLock); Message* msg = messages->find(cursor); if (msg) { - const uint64_t contentSize = msg->getContentSize(); + const uint64_t contentSize = msg->getMessageSize(); observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0); if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); @@ -978,7 +986,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor) */ void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete) { - current -= QueueDepth(1, msg.getContentSize()); + current -= QueueDepth(1, msg.getMessageSize()); mgntDeqStats(msg, mgmtObject, brokerMgmtObject); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ @@ -1182,18 +1190,27 @@ void Queue::encode(Buffer& buffer) const buffer.putShortString(name); buffer.put(encodableSettings); buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string("")); + buffer.putShortString(userId); } uint32_t Queue::encodedSize() const { return name.size() + 1/*short string size octet*/ + (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */ + + userId.size() + 1 /* short string */ + encodableSettings.encodedSize(); } +void Queue::updateAclUserQueueCount() +{ + if (broker->getAcl()) + broker->getAcl()->approveCreateQueue(userId, name); +} + Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) { string name; + string _userId; buffer.getShortString(name); FieldTable ft; buffer.get(ft); @@ -1207,6 +1224,12 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) result.first->alternateExchangeName.assign(altExch); } + //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized + if (buffer.available()) { + buffer.getShortString(_userId); + result.first->setOwningUser(_userId); + } + return result.first; } |