summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp45
1 files changed, 34 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index d1f1afd61a..449488435d 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -88,7 +88,7 @@ inline void mgntEnqStats(const Message& msg,
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalEnqueues +=1;
bStats->msgTotalEnqueues += 1;
qStats->byteTotalEnqueues += contentSize;
@@ -111,7 +111,7 @@ inline void mgntDeqStats(const Message& msg,
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
_qmf::Broker::PerThreadStats *bStats = brokerMgmtObject->getStatistics();
- uint64_t contentSize = msg.getContentSize();
+ uint64_t contentSize = msg.getMessageSize();
qStats->msgTotalDequeues += 1;
bStats->msgTotalDequeues += 1;
@@ -131,7 +131,15 @@ inline void mgntDeqStats(const Message& msg,
QueueSettings merge(const QueueSettings& inputs, const Broker::Options& globalOptions)
{
QueueSettings settings(inputs);
- if (!settings.maxDepth.hasSize() && globalOptions.queueLimit) {
+ settings.maxDepth = QueueDepth();
+ if (inputs.maxDepth.hasCount() && inputs.maxDepth.getCount()) {
+ settings.maxDepth.setCount(inputs.maxDepth.getCount());
+ }
+ if (inputs.maxDepth.hasSize()) {
+ if (inputs.maxDepth.getSize()) {
+ settings.maxDepth.setSize(inputs.maxDepth.getSize());
+ }
+ } else if (globalOptions.queueLimit) {
settings.maxDepth.setSize(globalOptions.queueLimit);
}
return settings;
@@ -194,7 +202,7 @@ Queue::Queue(const string& _name, const QueueSettings& _settings,
redirectSource(false)
{
current.setCount(0);//always track depth in messages
- if (settings.maxDepth.hasSize()) current.setSize(0);//track depth in bytes only if policy requires it
+ if (settings.maxDepth.getSize()) current.setSize(0);//track depth in bytes only if policy requires it
if (settings.traceExcludes.size()) {
split(traceExclude, settings.traceExcludes, ", ");
}
@@ -297,7 +305,7 @@ void Queue::deliverTo(Message msg, TxBuffer* txn)
void Queue::recoverPrepared(const Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- current += QueueDepth(1, msg.getContentSize());
+ current += QueueDepth(1, msg.getMessageSize());
}
void Queue::recover(Message& msg)
@@ -311,7 +319,7 @@ void Queue::process(Message& msg)
push(msg);
if (mgmtObject != 0){
_qmf::Queue::PerThreadStats *qStats = mgmtObject->getStatistics();
- const uint64_t contentSize = msg.getContentSize();
+ const uint64_t contentSize = msg.getMessageSize();
qStats->msgTxnEnqueues += 1;
qStats->byteTxnEnqueues += contentSize;
mgmtObject->statisticsUpdated();
@@ -853,7 +861,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message& msg)
{
Mutex::ScopedLock locker(messageLock);
- if (!checkDepth(QueueDepth(1, msg.getContentSize()), msg)) {
+ if (!checkDepth(QueueDepth(1, msg.getMessageSize()), msg)) {
return false;
}
}
@@ -883,7 +891,7 @@ void Queue::enqueueAborted(const Message& msg)
//Called when any transactional enqueue is aborted (including but
//not limited to a recovered dtx transaction)
Mutex::ScopedLock locker(messageLock);
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
}
void Queue::enqueueCommited(Message& msg)
@@ -911,7 +919,7 @@ void Queue::dequeueCommited(const Message& msg)
observeDequeue(msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
- mgmtObject->inc_byteTxnDequeues(msg.getContentSize());
+ mgmtObject->inc_byteTxnDequeues(msg.getMessageSize());
}
}
@@ -954,7 +962,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
Mutex::ScopedLock locker(messageLock);
Message* msg = messages->find(cursor);
if (msg) {
- const uint64_t contentSize = msg->getContentSize();
+ const uint64_t contentSize = msg->getMessageSize();
observeDequeue(*msg, locker, settings.autodelete ? &autodelete : 0);
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
@@ -978,7 +986,7 @@ void Queue::dequeueCommitted(const QueueCursor& cursor)
*/
void Queue::observeDequeue(const Message& msg, const Mutex::ScopedLock& lock, ScopedAutoDelete* autodelete)
{
- current -= QueueDepth(1, msg.getContentSize());
+ current -= QueueDepth(1, msg.getMessageSize());
mgntDeqStats(msg, mgmtObject, brokerMgmtObject);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
@@ -1182,18 +1190,27 @@ void Queue::encode(Buffer& buffer) const
buffer.putShortString(name);
buffer.put(encodableSettings);
buffer.putShortString(alternateExchange.get() ? alternateExchange->getName() : std::string(""));
+ buffer.putShortString(userId);
}
uint32_t Queue::encodedSize() const
{
return name.size() + 1/*short string size octet*/
+ (alternateExchange.get() ? alternateExchange->getName().size() : 0) + 1 /* short string */
+ + userId.size() + 1 /* short string */
+ encodableSettings.encodedSize();
}
+void Queue::updateAclUserQueueCount()
+{
+ if (broker->getAcl())
+ broker->getAcl()->approveCreateQueue(userId, name);
+}
+
Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
{
string name;
+ string _userId;
buffer.getShortString(name);
FieldTable ft;
buffer.get(ft);
@@ -1207,6 +1224,12 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer )
result.first->alternateExchangeName.assign(altExch);
}
+ //get userId of queue's creator; ACL counters for userId are done after ACL plugin is initialized
+ if (buffer.available()) {
+ buffer.getShortString(_userId);
+ result.first->setOwningUser(_userId);
+ }
+
return result.first;
}