summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp126
1 files changed, 99 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e59857462c..b05172f984 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -20,6 +20,7 @@
*/
#include "qpid/broker/Broker.h"
+#include "qpid/broker/Cluster.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueEvents.h"
#include "qpid/broker/Exchange.h"
@@ -224,6 +225,7 @@ void Queue::requeue(const QueuedMessage& msg){
}
}
}
+ if (broker) broker->getCluster().release(msg);
copy.notify();
}
@@ -236,8 +238,22 @@ void Queue::clearLVQIndex(const QueuedMessage& msg){
}
}
+// Inform the cluster of an acquired message on exit from a function
+// that does the acquiring. The calling function should set qmsg
+// to the acquired message.
+struct ClusterAcquireOnExit {
+ Broker* broker;
+ QueuedMessage qmsg;
+ ClusterAcquireOnExit(Broker* b) : broker(b) {}
+ ~ClusterAcquireOnExit() {
+ if (broker && qmsg.queue) broker->getCluster().acquire(qmsg);
+ }
+};
+
bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& message)
{
+ ClusterAcquireOnExit willAcquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
QPID_LOG(debug, "Attempting to acquire message at " << position);
@@ -248,16 +264,18 @@ bool Queue::acquireMessageAt(const SequenceNumber& position, QueuedMessage& mess
if (lastValueQueue) {
clearLVQIndex(*i);
}
- QPID_LOG(debug,
- "Acquired message at " << i->position << " from " << name);
+ QPID_LOG(debug, "Acquired message at " << i->position << " from " << name);
+ willAcquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Could not acquire message at " << position << " from " << name << "; no message at that position");
return false;
}
bool Queue::acquire(const QueuedMessage& msg) {
+ ClusterAcquireOnExit acquire(broker);
+
Mutex::ScopedLock locker(messageLock);
assertClusterSafe();
@@ -265,16 +283,17 @@ bool Queue::acquire(const QueuedMessage& msg) {
Messages::iterator i = findAt(msg.position);
if ((i != messages.end() && i->position == msg.position) && // note that in some cases payload not be set
(!lastValueQueue ||
- (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
- ) {
+ (lastValueQueue && msg.payload.get() == checkLvqReplace(*i).payload.get()) ) // note this is safe for no payload set 0==0
+ ) {
clearLVQIndex(msg);
QPID_LOG(debug,
"Match found, acquire succeeded: " <<
i->position << " == " << msg.position);
+ acquire.qmsg = *i;
messages.erase(i);
return true;
- }
+ }
QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
@@ -314,6 +333,8 @@ bool Queue::getNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
{
while (true) {
+ ClusterAcquireOnExit willAcquire(broker); // Outside the lock
+
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
@@ -330,6 +351,7 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
+ willAcquire.qmsg = msg;
popMsg(msg);
return CONSUMED;
} else {
@@ -451,40 +473,51 @@ QueuedMessage Queue::find(SequenceNumber pos) const {
return QueuedMessage();
}
-void Queue::consume(Consumer::shared_ptr c, bool requestExclusive){
+void Queue::consume(Consumer::shared_ptr c, bool requestExclusive) {
assertClusterSafe();
- Mutex::ScopedLock locker(consumerLock);
- if(exclusive) {
- throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
- } else if(requestExclusive) {
- if(consumerCount) {
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ if(exclusive) {
throw ResourceLockedException(
- QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
- } else {
- exclusive = c->getSession();
+ QPID_MSG("Queue " << getName() << " has an exclusive consumer. No more consumers allowed."));
+ } else if(requestExclusive) {
+ if(consumerCount) {
+ throw ResourceLockedException(
+ QPID_MSG("Queue " << getName() << " already has consumers. Exclusive access denied."));
+ } else {
+ exclusive = c->getSession();
+ }
}
+ consumers = ++consumerCount;
+ if (mgmtObject != 0)
+ mgmtObject->inc_consumerCount ();
}
- consumerCount++;
- if (mgmtObject != 0)
- mgmtObject->inc_consumerCount ();
+ if (broker) broker->getCluster().consume(*this, consumers);
}
void Queue::cancel(Consumer::shared_ptr c){
removeListener(c);
- Mutex::ScopedLock locker(consumerLock);
- consumerCount--;
- if(exclusive) exclusive = 0;
- if (mgmtObject != 0)
- mgmtObject->dec_consumerCount ();
+ size_t consumers;
+ {
+ Mutex::ScopedLock locker(consumerLock);
+ consumers = --consumerCount;
+ if(exclusive) exclusive = 0;
+ if (mgmtObject != 0)
+ mgmtObject->dec_consumerCount ();
+ }
+ if (broker) broker->getCluster().cancel(*this, consumers);
}
QueuedMessage Queue::get(){
+ ClusterAcquireOnExit acquire(broker); // Outside lock
+
Mutex::ScopedLock locker(messageLock);
QueuedMessage msg(this);
if(!messages.empty()){
msg = getFront();
+ acquire.qmsg = msg;
popMsg(msg);
}
return msg;
@@ -609,10 +642,11 @@ void Queue::popMsg(QueuedMessage& qmsg)
void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
assertClusterSafe();
+ QueuedMessage qm;
QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- QueuedMessage qm(this, msg, ++sequence);
+ qm = QueuedMessage(this, msg, ++sequence);
if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence);
LVQ::iterator i;
@@ -629,12 +663,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
boost::intrusive_ptr<Message> old = i->second->getReplacementMessage(this);
if (!old) old = i->second;
i->second->setReplacementMessage(msg,this);
+ // FIXME aconway 2010-10-15: it is incorrect to use qm.position below
+ // should be using the position of the message being replaced.
if (isRecovery) {
//can't issue new requests for the store until
//recovery is complete
pendingDequeues.push_back(QueuedMessage(qm.queue, old, qm.position));
} else {
- Mutex::ScopedUnlock u(messageLock);
+ Mutex::ScopedUnlock u(messageLock);
dequeue(0, QueuedMessage(qm.queue, old, qm.position));
}
}
@@ -651,6 +687,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
}
}
copy.notify();
+ if (broker) broker->getCluster().enqueue(qm);
}
QueuedMessage Queue::getFront()
@@ -792,12 +829,42 @@ void Queue::enqueueAborted(boost::intrusive_ptr<Message> msg)
if (policy.get()) policy->enqueueAborted(msg);
}
+void Queue::accept(TransactionContext* ctxt, const QueuedMessage& msg) {
+ if (broker) broker->getCluster().accept(msg);
+ dequeue(ctxt, msg);
+}
+
+struct ScopedClusterReject {
+ Broker* broker;
+ const QueuedMessage& qmsg;
+ ScopedClusterReject(Broker* b, const QueuedMessage& m) : broker(b), qmsg(m) {
+ if (broker) broker->getCluster().reject(qmsg);
+ }
+ ~ScopedClusterReject() {
+ if (broker) broker->getCluster().rejected(qmsg);
+ }
+};
+
+void Queue::reject(const QueuedMessage &msg) {
+ ScopedClusterReject scr(broker, msg);
+ Exchange::shared_ptr alternate = getAlternateExchange();
+ if (alternate) {
+ DeliverableMessage delivery(msg.payload);
+ alternate->route(delivery, msg.payload->getRoutingKey(), msg.payload->getApplicationHeaders());
+ QPID_LOG(info, "Routed rejected message from " << getName() << " to "
+ << alternate->getName());
+ } else {
+ //just drop it
+ QPID_LOG(info, "Dropping rejected message from " << getName());
+ }
+ dequeue(0, msg);
+}
+
// return true if store exists,
bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
{
ScopedUse u(barrier);
if (!u.acquired) return false;
-
{
Mutex::ScopedLock locker(messageLock);
if (!isEnqueued(msg)) return false;
@@ -846,6 +913,9 @@ void Queue::popAndDequeue()
*/
void Queue::dequeued(const QueuedMessage& msg)
{
+ // Note: Cluster::dequeued does only local book-keeping, no multicast
+ // So OK to call here with lock held.
+ if (broker) broker->getCluster().dequeue(msg);
if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
if (eventMode == ENQUEUE_AND_DEQUEUE && eventMgr) {
@@ -861,6 +931,7 @@ void Queue::create(const FieldTable& _settings)
store->create(*this, _settings);
}
configure(_settings);
+ if (broker) broker->getCluster().create(*this);
}
void Queue::configure(const FieldTable& _settings, bool recovering)
@@ -934,6 +1005,7 @@ void Queue::destroy()
store->destroy(*this);
store = 0;//ensure we make no more calls to the store for this queue
}
+ if (broker) broker->getCluster().destroy(*this);
}
void Queue::notifyDeleted()