summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-31 16:45:20 +0000
committerGordon Sim <gsim@apache.org>2007-08-31 16:45:20 +0000
commitf9236f2f81a1df20a4a95d2e8dc8538b33fb4746 (patch)
tree66570f8ee6b0adaf5906cd724debe3ed5404d3f2 /qpid/cpp
parent0c9a820ac910c913e0a256f3d292111ebf2efa37 (diff)
downloadqpid-python-f9236f2f81a1df20a4a95d2e8dc8538b33fb4746.tar.gz
Pass QueuedMessage to queues consumers. This records the position of that message in the queue which is need to handle rlease and acquire.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@571518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerChannel.h12
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerQueue.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/BrokerQueue.h12
-rw-r--r--qpid/cpp/src/qpid/broker/Consumer.h13
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/DeliveryRecord.h12
-rw-r--r--qpid/cpp/src/tests/BrokerChannelTest.cpp12
-rw-r--r--qpid/cpp/src/tests/QueueTest.cpp16
-rw-r--r--qpid/cpp/src/tests/TxAckTest.cpp4
-rw-r--r--qpid/cpp/src/tests/TxPublishTest.cpp4
11 files changed, 96 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
index 615a26beab..ceecdf3040 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -218,31 +218,33 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent,
const string& _name,
Queue::shared_ptr _queue,
bool ack,
- bool _nolocal
+ bool _nolocal,
+ bool _acquire
) : parent(_parent),
token(_token),
name(_name),
queue(_queue),
ackExpected(ack),
nolocal(_nolocal),
+ acquire(_acquire),
blocked(false),
windowing(true),
msgCredit(0xFFFFFFFF),
byteCredit(0xFFFFFFFF) {}
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
+bool Channel::ConsumerImpl::deliver(QueuedMessage& msg)
{
- if (nolocal && &(parent->connection) == msg->getPublisher()) {
+ if (nolocal && &(parent->connection) == msg.payload->getPublisher()) {
return false;
} else {
- if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) {
+ if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) {
blocked = true;
} else {
blocked = false;
Mutex::ScopedLock locker(parent->deliveryLock);
- DeliveryId deliveryTag = parent->out.deliver(msg, token);
+ DeliveryId deliveryTag = parent->out.deliver(msg.payload, token);
if (ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
}
@@ -409,10 +411,10 @@ void Channel::recover(bool requeue)
bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
{
- Message::shared_ptr msg = queue->dequeue();
- if(msg){
+ QueuedMessage msg = queue->dequeue();
+ if(msg.payload){
Mutex::ScopedLock locker(deliveryLock);
- DeliveryId myDeliveryTag = out.deliver(msg, token);
+ DeliveryId myDeliveryTag = out.deliver(msg.payload, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h
index cdbab37ebc..98ee073d3d 100644
--- a/qpid/cpp/src/qpid/broker/BrokerChannel.h
+++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h
@@ -69,17 +69,20 @@ class Channel
const Queue::shared_ptr queue;
const bool ackExpected;
const bool nolocal;
+ const bool acquire;
bool blocked;
bool windowing;
- uint32_t msgCredit;
-
+ uint32_t msgCredit;
uint32_t byteCredit;
+ bool checkCredit(Message::shared_ptr& msg);
+
public:
ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token,
- const string& name, Queue::shared_ptr queue, bool ack, bool nolocal);
+ const string& name, Queue::shared_ptr queue,
+ bool ack, bool nolocal, bool acquire = true);
~ConsumerImpl();
- bool deliver(Message::shared_ptr& msg);
+ bool deliver(QueuedMessage& msg);
void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
void cancel();
void requestDispatch();
@@ -90,7 +93,6 @@ class Channel
void addMessageCredit(uint32_t value);
void flush();
void stop();
- bool checkCredit(Message::shared_ptr& msg);
void acknowledged(const DeliveryRecord&);
};
diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
index 7311d043d0..553f6016d2 100644
--- a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
@@ -102,10 +102,10 @@ void Queue::process(Message::shared_ptr& msg){
}
-void Queue::requeue(Message::shared_ptr& msg){
+void Queue::requeue(const QueuedMessage& msg){
{
Mutex::ScopedLock locker(messageLock);
- msg->enqueueComplete(); // mark the message as enqueued
+ msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
}
serializer.execute(dispatchCallback);
@@ -118,7 +118,7 @@ void Queue::requestDispatch(){
}
-bool Queue::dispatch(Message::shared_ptr& msg){
+bool Queue::dispatch(QueuedMessage& msg){
RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
@@ -144,21 +144,19 @@ bool Queue::dispatch(Message::shared_ptr& msg){
void Queue::dispatch(){
-
-
- Message::shared_ptr msg;
+ QueuedMessage msg;
while(true){
{
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) break;
msg = messages.front();
}
- if( msg->isEnqueueComplete() && dispatch(msg) ){
+ if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
pop();
- }else break;
-
- }
-
+ } else {
+ break;
+ }
+ }
}
void Queue::consume(Consumer* c, bool requestExclusive){
@@ -185,18 +183,16 @@ void Queue::cancel(Consumer* c){
if(exclusive == c) exclusive = 0;
}
-Message::shared_ptr Queue::dequeue(){
+QueuedMessage Queue::dequeue(){
Mutex::ScopedLock locker(messageLock);
- Message::shared_ptr msg;
+ QueuedMessage msg;
if(!messages.empty()){
msg = messages.front();
- if (msg->isEnqueueComplete()){
+ if (msg.payload->isEnqueueComplete()){
pop();
- return msg;
}
}
- Message::shared_ptr msg_empty;
- return msg_empty;
+ return msg;
}
uint32_t Queue::purge(){
@@ -208,13 +204,13 @@ uint32_t Queue::purge(){
void Queue::pop(){
Mutex::ScopedLock locker(messageLock);
- if (policy.get()) policy->dequeued(messages.front()->contentSize());
+ if (policy.get()) policy->dequeued(messages.front().payload->contentSize());
messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
Mutex::ScopedLock locker(messageLock);
- messages.push_back(msg);
+ messages.push_back(QueuedMessage(msg, ++sequence));
if (policy.get()) {
policy->enqueued(msg->contentSize());
if (policy->limitExceeded()) {
@@ -229,7 +225,7 @@ uint32_t Queue::getMessageCount() const{
uint32_t count =0;
for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) {
- if ( (*i)->isEnqueueComplete() ) count ++;
+ if ( i->payload->isEnqueueComplete() ) count ++;
}
return count;
@@ -296,7 +292,7 @@ void Queue::destroy()
if (alternateExchange.get()) {
Mutex::ScopedLock locker(messageLock);
while(!messages.empty()){
- DeliverableMessage msg(messages.front());
+ DeliverableMessage msg(messages.front().payload);
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
&(msg.getMessage().getApplicationHeaders()));
pop();
diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.h b/qpid/cpp/src/qpid/broker/BrokerQueue.h
index 5ba103d3ed..d15b5fc8c5 100644
--- a/qpid/cpp/src/qpid/broker/BrokerQueue.h
+++ b/qpid/cpp/src/qpid/broker/BrokerQueue.h
@@ -46,9 +46,6 @@ namespace qpid {
class TransactionContext;
class Exchange;
- /**
- * Thrown when exclusive access would be violated.
- */
using std::string;
/**
@@ -59,7 +56,7 @@ namespace qpid {
*/
class Queue : public PersistableQueue{
typedef std::vector<Consumer*> Consumers;
- typedef std::deque<Message::shared_ptr> Messages;
+ typedef std::deque<QueuedMessage> Messages;
struct DispatchFunctor {
Queue& queue;
@@ -84,10 +81,11 @@ namespace qpid {
boost::shared_ptr<Exchange> alternateExchange;
qpid::sys::Serializer<DispatchFunctor> serializer;
DispatchFunctor dispatchCallback;
+ framing::SequenceNumber sequence;
void pop();
void push(Message::shared_ptr& msg);
- bool dispatch(Message::shared_ptr& msg);
+ bool dispatch(QueuedMessage& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
/**
* only called by serilizer
@@ -132,7 +130,7 @@ namespace qpid {
* available it will be dispatched immediately, else it
* will be returned to the front of the queue.
*/
- void requeue(Message::shared_ptr& msg);
+ void requeue(const QueuedMessage& msg);
/**
* Used during recovery to add stored messages back to the queue
*/
@@ -166,7 +164,7 @@ namespace qpid {
/**
* dequeues from memory only
*/
- Message::shared_ptr dequeue();
+ QueuedMessage dequeue();
const QueuePolicy* const getPolicy();
diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h
index dc229947b9..52da25082c 100644
--- a/qpid/cpp/src/qpid/broker/Consumer.h
+++ b/qpid/cpp/src/qpid/broker/Consumer.h
@@ -25,9 +25,20 @@
namespace qpid {
namespace broker {
+
+ struct QueuedMessage
+ {
+ Message::shared_ptr payload;
+ framing::SequenceNumber position;
+
+ QueuedMessage(Message::shared_ptr msg, framing::SequenceNumber sn) : payload(msg), position(sn) {}
+ QueuedMessage() {}
+ };
+
+
class Consumer{
public:
- virtual bool deliver(Message::shared_ptr& msg) = 0;
+ virtual bool deliver(QueuedMessage& msg) = 0;
virtual ~Consumer(){}
};
}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
index 43f85b9b6e..f0239ed261 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -24,26 +24,28 @@
using namespace qpid::broker;
using std::string;
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
const string _consumerTag,
const DeliveryId _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag),
- pull(false){}
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ acquired(false),
+ pull(false){}
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg,
+DeliveryRecord::DeliveryRecord(QueuedMessage& _msg,
Queue::shared_ptr _queue,
const DeliveryId _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(""),
- deliveryTag(_deliveryTag),
- pull(true){}
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ acquired(false),
+ pull(true){}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
- queue->dequeue(ctxt, msg);
+ queue->dequeue(ctxt, msg.payload);
}
bool DeliveryRecord::matches(DeliveryId tag) const{
@@ -67,18 +69,18 @@ void DeliveryRecord::redeliver(Channel* const channel) const{
//if message was originally sent as response to get, we must requeue it
requeue();
}else{
- channel->deliver(msg, consumerTag, deliveryTag);
+ channel->deliver(msg.payload, consumerTag, deliveryTag);
}
}
void DeliveryRecord::requeue() const{
- msg->redeliver();
+ msg.payload->redeliver();
queue->requeue(msg);
}
void DeliveryRecord::updateByteCredit(uint32_t& credit) const
{
- credit += msg->getRequiredCredit();
+ credit += msg.payload->getRequiredCredit();
}
@@ -86,7 +88,7 @@ void DeliveryRecord::addTo(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size += msg->contentSize();
+ prefetch.size += msg.payload->contentSize();
prefetch.count++;
}
}
@@ -95,7 +97,7 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{
if(!pull){
//ignore 'pulled' messages (i.e. those that were sent in
//response to get) when calculating prefetch
- prefetch.size -= msg->contentSize();
+ prefetch.size -= msg.payload->contentSize();
prefetch.count--;
}
}
diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
index a1f82cb757..a1086488c4 100644
--- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h
@@ -26,6 +26,7 @@
#include <ostream>
#include "AccumulatedAck.h"
#include "BrokerQueue.h"
+#include "Consumer.h"
#include "DeliveryId.h"
#include "Message.h"
#include "Prefetch.h"
@@ -38,15 +39,16 @@ namespace qpid {
* Record of a delivery for which an ack is outstanding.
*/
class DeliveryRecord{
- mutable Message::shared_ptr msg;
+ mutable QueuedMessage msg;
mutable Queue::shared_ptr queue;
const std::string consumerTag;
const DeliveryId deliveryTag;
- bool pull;
+ bool acquired;
+ const bool pull;
public:
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
- DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag);
+ DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag);
void dequeue(TransactionContext* ctxt = 0) const;
bool matches(DeliveryId tag) const;
@@ -60,6 +62,8 @@ namespace qpid {
void subtractFrom(Prefetch&) const;
const std::string& getConsumerTag() const { return consumerTag; }
bool isPull() const { return pull; }
+ bool isAcquired() const { return acquired; }
+ void setAcquired(bool isAcquired) { acquired = isAcquired; }
friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&);
};
diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp
index 1e5a30f157..0787405eb7 100644
--- a/qpid/cpp/src/tests/BrokerChannelTest.cpp
+++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp
@@ -256,13 +256,13 @@ class BrokerChannelTest : public CppUnit::TestCase
queue->deliver(msg3);
sleep(2);
- Message::shared_ptr next = queue->dequeue();
+ Message::shared_ptr next = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize());
- next = queue->dequeue();
+ next = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg2, next);
CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), next->encodedContentSize());
- next = queue->dequeue();
+ next = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg3, next);
CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize());
@@ -295,11 +295,11 @@ class BrokerChannelTest : public CppUnit::TestCase
queue3->deliver(msg1);
sleep(2);
- Message::shared_ptr next = queue1->dequeue();
+ Message::shared_ptr next = queue1->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
- next = queue2->dequeue();
+ next = queue2->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
- next = queue3->dequeue();
+ next = queue3->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
}
diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp
index ef1518af4c..bf742f9511 100644
--- a/qpid/cpp/src/tests/QueueTest.cpp
+++ b/qpid/cpp/src/tests/QueueTest.cpp
@@ -40,10 +40,10 @@ public:
bool received;
TestConsumer(): received(false) {};
- virtual bool deliver(Message::shared_ptr& msg){
- last = msg;
- received = true;
- return true;
+ virtual bool deliver(QueuedMessage& msg){
+ last = msg.payload;
+ received = true;
+ return true;
};
};
@@ -97,7 +97,7 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT(!c1.received);
msg1->enqueueComplete();
- received = queue->dequeue();
+ received = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
@@ -190,11 +190,11 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount());
- received = queue->dequeue();
+ received = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount());
- received = queue->dequeue();
+ received = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
@@ -207,7 +207,7 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
- received = queue->dequeue();
+ received = queue->dequeue().payload;
CPPUNIT_ASSERT(!received);
CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp
index 89a907d495..65426e4e21 100644
--- a/qpid/cpp/src/tests/TxAckTest.cpp
+++ b/qpid/cpp/src/tests/TxAckTest.cpp
@@ -76,7 +76,9 @@ public:
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
messages.push_back(msg);
- deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
+ QueuedMessage qm;
+ qm.payload = msg;
+ deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1)));
}
//assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)
diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp
index 5628cf1d1c..4ec526f207 100644
--- a/qpid/cpp/src/tests/TxPublishTest.cpp
+++ b/qpid/cpp/src/tests/TxPublishTest.cpp
@@ -99,13 +99,13 @@ public:
op.prepare(0);
op.commit();
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
- Message::shared_ptr msg_dequeue = queue1->dequeue();
+ Message::shared_ptr msg_dequeue = queue1->dequeue().payload;
CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete());
CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());
- CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());
+ CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue().payload);
}
};