summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp37
1 files changed, 24 insertions, 13 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e4a6449e08..8c990795e7 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -22,6 +22,7 @@
#include <boost/format.hpp>
#include "qpid/log/Statement.h"
+#include "Broker.h"
#include "Queue.h"
#include "Exchange.h"
#include "DeliverableMessage.h"
@@ -47,7 +48,6 @@ Queue::Queue(const string& _name, bool _autodelete,
store(_store),
owner(_owner),
next(0),
- exclusive(0),
persistenceId(0),
serializer(false),
dispatchCallback(*this)
@@ -80,7 +80,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}else {
push(msg);
}
- QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders());
+ QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << this << "]");
serializer.execute(dispatchCallback);
}
}
@@ -124,7 +124,7 @@ bool Queue::acquire(const QueuedMessage& msg) {
return false;
}
-void Queue::requestDispatch(Consumer* c){
+void Queue::requestDispatch(Consumer::ptr c){
if (!c || c->preAcquires()) {
serializer.execute(dispatchCallback);
} else {
@@ -138,12 +138,12 @@ void Queue::flush(DispatchCompletion& completion)
serializer.execute(f);
}
-Consumer* Queue::allocate()
+Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
if(acquirers.empty()){
- return 0;
+ return Consumer::ptr();
}else if(exclusive){
return exclusive;
}else{
@@ -154,14 +154,16 @@ Consumer* Queue::allocate()
bool Queue::dispatch(QueuedMessage& msg)
{
- Consumer* c = allocate();
- Consumer* first = c;
+ Consumer::ptr c = allocate();
+ Consumer::ptr first = c;
while(c){
if(c->deliver(msg)) {
return true;
} else {
c = allocate();
- if (c == first) c = 0;
+ if (c == first) {
+ break;
+ }
}
}
return false;
@@ -199,7 +201,7 @@ void Queue::serviceAllBrowsers()
}
}
-void Queue::serviceBrowser(Consumer* browser)
+void Queue::serviceBrowser(Consumer::ptr browser)
{
QueuedMessage msg;
while (seek(msg, browser->position) && browser->deliver(msg)) {
@@ -219,7 +221,7 @@ bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
return false;
}
-void Queue::consume(Consumer* c, bool requestExclusive){
+void Queue::consume(Consumer::ptr c, bool requestExclusive){
RWlock::ScopedWlock locker(consumerLock);
if(exclusive) {
throw ChannelException(
@@ -242,17 +244,17 @@ void Queue::consume(Consumer* c, bool requestExclusive){
}
}
-void Queue::cancel(Consumer* c){
+void Queue::cancel(Consumer::ptr c){
RWlock::ScopedWlock locker(consumerLock);
if (c->preAcquires()) {
cancel(c, acquirers);
} else {
cancel(c, browsers);
}
- if(exclusive == c) exclusive = 0;
+ if(exclusive == c) exclusive.reset();
}
-void Queue::cancel(Consumer* c, Consumers& consumers)
+void Queue::cancel(Consumer::ptr c, Consumers& consumers)
{
Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
if (i != consumers.end())
@@ -442,3 +444,12 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange()
return alternateExchange;
}
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+ if (broker.getQueues().destroyIf(queue->getName(),
+ boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+ queue->unbind(broker.getExchanges(), queue);
+ queue->destroy();
+ }
+
+}