diff options
author | Gordon Sim <gsim@apache.org> | 2007-08-31 16:45:20 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-08-31 16:45:20 +0000 |
commit | f9236f2f81a1df20a4a95d2e8dc8538b33fb4746 (patch) | |
tree | 66570f8ee6b0adaf5906cd724debe3ed5404d3f2 /qpid/cpp | |
parent | 0c9a820ac910c913e0a256f3d292111ebf2efa37 (diff) | |
download | qpid-python-f9236f2f81a1df20a4a95d2e8dc8538b33fb4746.tar.gz |
Pass QueuedMessage to queues consumers. This records the position of that message in the queue which is need to handle rlease and acquire.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@571518 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerChannel.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerQueue.cpp | 38 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/BrokerQueue.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Consumer.h | 13 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.cpp | 34 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/DeliveryRecord.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/tests/BrokerChannelTest.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/tests/QueueTest.cpp | 16 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxAckTest.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/tests/TxPublishTest.cpp | 4 |
11 files changed, 96 insertions, 79 deletions
diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp index 615a26beab..ceecdf3040 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.cpp @@ -218,31 +218,33 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _name, Queue::shared_ptr _queue, bool ack, - bool _nolocal + bool _nolocal, + bool _acquire ) : parent(_parent), token(_token), name(_name), queue(_queue), ackExpected(ack), nolocal(_nolocal), + acquire(_acquire), blocked(false), windowing(true), msgCredit(0xFFFFFFFF), byteCredit(0xFFFFFFFF) {} -bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) +bool Channel::ConsumerImpl::deliver(QueuedMessage& msg) { - if (nolocal && &(parent->connection) == msg->getPublisher()) { + if (nolocal && &(parent->connection) == msg.payload->getPublisher()) { return false; } else { - if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) { + if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg.payload))) { blocked = true; } else { blocked = false; Mutex::ScopedLock locker(parent->deliveryLock); - DeliveryId deliveryTag = parent->out.deliver(msg, token); + DeliveryId deliveryTag = parent->out.deliver(msg.payload, token); if (ackExpected) { parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); } @@ -409,10 +411,10 @@ void Channel::recover(bool requeue) bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected) { - Message::shared_ptr msg = queue->dequeue(); - if(msg){ + QueuedMessage msg = queue->dequeue(); + if(msg.payload){ Mutex::ScopedLock locker(deliveryLock); - DeliveryId myDeliveryTag = out.deliver(msg, token); + DeliveryId myDeliveryTag = out.deliver(msg.payload, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } diff --git a/qpid/cpp/src/qpid/broker/BrokerChannel.h b/qpid/cpp/src/qpid/broker/BrokerChannel.h index cdbab37ebc..98ee073d3d 100644 --- a/qpid/cpp/src/qpid/broker/BrokerChannel.h +++ b/qpid/cpp/src/qpid/broker/BrokerChannel.h @@ -69,17 +69,20 @@ class Channel const Queue::shared_ptr queue; const bool ackExpected; const bool nolocal; + const bool acquire; bool blocked; bool windowing; - uint32_t msgCredit; - + uint32_t msgCredit; uint32_t byteCredit; + bool checkCredit(Message::shared_ptr& msg); + public: ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, - const string& name, Queue::shared_ptr queue, bool ack, bool nolocal); + const string& name, Queue::shared_ptr queue, + bool ack, bool nolocal, bool acquire = true); ~ConsumerImpl(); - bool deliver(Message::shared_ptr& msg); + bool deliver(QueuedMessage& msg); void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); void cancel(); void requestDispatch(); @@ -90,7 +93,6 @@ class Channel void addMessageCredit(uint32_t value); void flush(); void stop(); - bool checkCredit(Message::shared_ptr& msg); void acknowledged(const DeliveryRecord&); }; diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp index 7311d043d0..553f6016d2 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.cpp +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.cpp @@ -102,10 +102,10 @@ void Queue::process(Message::shared_ptr& msg){ } -void Queue::requeue(Message::shared_ptr& msg){ +void Queue::requeue(const QueuedMessage& msg){ { Mutex::ScopedLock locker(messageLock); - msg->enqueueComplete(); // mark the message as enqueued + msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); } serializer.execute(dispatchCallback); @@ -118,7 +118,7 @@ void Queue::requestDispatch(){ } -bool Queue::dispatch(Message::shared_ptr& msg){ +bool Queue::dispatch(QueuedMessage& msg){ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide.... @@ -144,21 +144,19 @@ bool Queue::dispatch(Message::shared_ptr& msg){ void Queue::dispatch(){ - - - Message::shared_ptr msg; + QueuedMessage msg; while(true){ { Mutex::ScopedLock locker(messageLock); if (messages.empty()) break; msg = messages.front(); } - if( msg->isEnqueueComplete() && dispatch(msg) ){ + if( msg.payload->isEnqueueComplete() && dispatch(msg) ) { pop(); - }else break; - - } - + } else { + break; + } + } } void Queue::consume(Consumer* c, bool requestExclusive){ @@ -185,18 +183,16 @@ void Queue::cancel(Consumer* c){ if(exclusive == c) exclusive = 0; } -Message::shared_ptr Queue::dequeue(){ +QueuedMessage Queue::dequeue(){ Mutex::ScopedLock locker(messageLock); - Message::shared_ptr msg; + QueuedMessage msg; if(!messages.empty()){ msg = messages.front(); - if (msg->isEnqueueComplete()){ + if (msg.payload->isEnqueueComplete()){ pop(); - return msg; } } - Message::shared_ptr msg_empty; - return msg_empty; + return msg; } uint32_t Queue::purge(){ @@ -208,13 +204,13 @@ uint32_t Queue::purge(){ void Queue::pop(){ Mutex::ScopedLock locker(messageLock); - if (policy.get()) policy->dequeued(messages.front()->contentSize()); + if (policy.get()) policy->dequeued(messages.front().payload->contentSize()); messages.pop_front(); } void Queue::push(Message::shared_ptr& msg){ Mutex::ScopedLock locker(messageLock); - messages.push_back(msg); + messages.push_back(QueuedMessage(msg, ++sequence)); if (policy.get()) { policy->enqueued(msg->contentSize()); if (policy->limitExceeded()) { @@ -229,7 +225,7 @@ uint32_t Queue::getMessageCount() const{ uint32_t count =0; for ( Messages::const_iterator i = messages.begin(); i != messages.end(); ++i ) { - if ( (*i)->isEnqueueComplete() ) count ++; + if ( i->payload->isEnqueueComplete() ) count ++; } return count; @@ -296,7 +292,7 @@ void Queue::destroy() if (alternateExchange.get()) { Mutex::ScopedLock locker(messageLock); while(!messages.empty()){ - DeliverableMessage msg(messages.front()); + DeliverableMessage msg(messages.front().payload); alternateExchange->route(msg, msg.getMessage().getRoutingKey(), &(msg.getMessage().getApplicationHeaders())); pop(); diff --git a/qpid/cpp/src/qpid/broker/BrokerQueue.h b/qpid/cpp/src/qpid/broker/BrokerQueue.h index 5ba103d3ed..d15b5fc8c5 100644 --- a/qpid/cpp/src/qpid/broker/BrokerQueue.h +++ b/qpid/cpp/src/qpid/broker/BrokerQueue.h @@ -46,9 +46,6 @@ namespace qpid { class TransactionContext; class Exchange; - /** - * Thrown when exclusive access would be violated. - */ using std::string; /** @@ -59,7 +56,7 @@ namespace qpid { */ class Queue : public PersistableQueue{ typedef std::vector<Consumer*> Consumers; - typedef std::deque<Message::shared_ptr> Messages; + typedef std::deque<QueuedMessage> Messages; struct DispatchFunctor { Queue& queue; @@ -84,10 +81,11 @@ namespace qpid { boost::shared_ptr<Exchange> alternateExchange; qpid::sys::Serializer<DispatchFunctor> serializer; DispatchFunctor dispatchCallback; + framing::SequenceNumber sequence; void pop(); void push(Message::shared_ptr& msg); - bool dispatch(Message::shared_ptr& msg); + bool dispatch(QueuedMessage& msg); void setPolicy(std::auto_ptr<QueuePolicy> policy); /** * only called by serilizer @@ -132,7 +130,7 @@ namespace qpid { * available it will be dispatched immediately, else it * will be returned to the front of the queue. */ - void requeue(Message::shared_ptr& msg); + void requeue(const QueuedMessage& msg); /** * Used during recovery to add stored messages back to the queue */ @@ -166,7 +164,7 @@ namespace qpid { /** * dequeues from memory only */ - Message::shared_ptr dequeue(); + QueuedMessage dequeue(); const QueuePolicy* const getPolicy(); diff --git a/qpid/cpp/src/qpid/broker/Consumer.h b/qpid/cpp/src/qpid/broker/Consumer.h index dc229947b9..52da25082c 100644 --- a/qpid/cpp/src/qpid/broker/Consumer.h +++ b/qpid/cpp/src/qpid/broker/Consumer.h @@ -25,9 +25,20 @@ namespace qpid { namespace broker { + + struct QueuedMessage + { + Message::shared_ptr payload; + framing::SequenceNumber position; + + QueuedMessage(Message::shared_ptr msg, framing::SequenceNumber sn) : payload(msg), position(sn) {} + QueuedMessage() {} + }; + + class Consumer{ public: - virtual bool deliver(Message::shared_ptr& msg) = 0; + virtual bool deliver(QueuedMessage& msg) = 0; virtual ~Consumer(){} }; } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp index 43f85b9b6e..f0239ed261 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -24,26 +24,28 @@ using namespace qpid::broker; using std::string; -DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, +DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, const string _consumerTag, const DeliveryId _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag), - pull(false){} + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + acquired(false), + pull(false){} -DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, +DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, Queue::shared_ptr _queue, const DeliveryId _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(""), - deliveryTag(_deliveryTag), - pull(true){} + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + acquired(false), + pull(true){} void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ - queue->dequeue(ctxt, msg); + queue->dequeue(ctxt, msg.payload); } bool DeliveryRecord::matches(DeliveryId tag) const{ @@ -67,18 +69,18 @@ void DeliveryRecord::redeliver(Channel* const channel) const{ //if message was originally sent as response to get, we must requeue it requeue(); }else{ - channel->deliver(msg, consumerTag, deliveryTag); + channel->deliver(msg.payload, consumerTag, deliveryTag); } } void DeliveryRecord::requeue() const{ - msg->redeliver(); + msg.payload->redeliver(); queue->requeue(msg); } void DeliveryRecord::updateByteCredit(uint32_t& credit) const { - credit += msg->getRequiredCredit(); + credit += msg.payload->getRequiredCredit(); } @@ -86,7 +88,7 @@ void DeliveryRecord::addTo(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size += msg->contentSize(); + prefetch.size += msg.payload->contentSize(); prefetch.count++; } } @@ -95,7 +97,7 @@ void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch.size -= msg->contentSize(); + prefetch.size -= msg.payload->contentSize(); prefetch.count--; } } diff --git a/qpid/cpp/src/qpid/broker/DeliveryRecord.h b/qpid/cpp/src/qpid/broker/DeliveryRecord.h index a1f82cb757..a1086488c4 100644 --- a/qpid/cpp/src/qpid/broker/DeliveryRecord.h +++ b/qpid/cpp/src/qpid/broker/DeliveryRecord.h @@ -26,6 +26,7 @@ #include <ostream> #include "AccumulatedAck.h" #include "BrokerQueue.h" +#include "Consumer.h" #include "DeliveryId.h" #include "Message.h" #include "Prefetch.h" @@ -38,15 +39,16 @@ namespace qpid { * Record of a delivery for which an ack is outstanding. */ class DeliveryRecord{ - mutable Message::shared_ptr msg; + mutable QueuedMessage msg; mutable Queue::shared_ptr queue; const std::string consumerTag; const DeliveryId deliveryTag; - bool pull; + bool acquired; + const bool pull; public: - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag); - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const DeliveryId deliveryTag); + DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag); + DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const DeliveryId deliveryTag); void dequeue(TransactionContext* ctxt = 0) const; bool matches(DeliveryId tag) const; @@ -60,6 +62,8 @@ namespace qpid { void subtractFrom(Prefetch&) const; const std::string& getConsumerTag() const { return consumerTag; } bool isPull() const { return pull; } + bool isAcquired() const { return acquired; } + void setAcquired(bool isAcquired) { acquired = isAcquired; } friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; diff --git a/qpid/cpp/src/tests/BrokerChannelTest.cpp b/qpid/cpp/src/tests/BrokerChannelTest.cpp index 1e5a30f157..0787405eb7 100644 --- a/qpid/cpp/src/tests/BrokerChannelTest.cpp +++ b/qpid/cpp/src/tests/BrokerChannelTest.cpp @@ -256,13 +256,13 @@ class BrokerChannelTest : public CppUnit::TestCase queue->deliver(msg3); sleep(2); - Message::shared_ptr next = queue->dequeue(); + Message::shared_ptr next = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1, next); CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize()); - next = queue->dequeue(); + next = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg2, next); CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), next->encodedContentSize()); - next = queue->dequeue(); + next = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg3, next); CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize()); @@ -295,11 +295,11 @@ class BrokerChannelTest : public CppUnit::TestCase queue3->deliver(msg1); sleep(2); - Message::shared_ptr next = queue1->dequeue(); + Message::shared_ptr next = queue1->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1, next); - next = queue2->dequeue(); + next = queue2->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1, next); - next = queue3->dequeue(); + next = queue3->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1, next); } diff --git a/qpid/cpp/src/tests/QueueTest.cpp b/qpid/cpp/src/tests/QueueTest.cpp index ef1518af4c..bf742f9511 100644 --- a/qpid/cpp/src/tests/QueueTest.cpp +++ b/qpid/cpp/src/tests/QueueTest.cpp @@ -40,10 +40,10 @@ public: bool received; TestConsumer(): received(false) {}; - virtual bool deliver(Message::shared_ptr& msg){ - last = msg; - received = true; - return true; + virtual bool deliver(QueuedMessage& msg){ + last = msg.payload; + received = true; + return true; }; }; @@ -97,7 +97,7 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT(!c1.received); msg1->enqueueComplete(); - received = queue->dequeue(); + received = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); @@ -190,11 +190,11 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount()); - received = queue->dequeue(); + received = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount()); - received = queue->dequeue(); + received = queue->dequeue().payload; CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount()); @@ -207,7 +207,7 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); - received = queue->dequeue(); + received = queue->dequeue().payload; CPPUNIT_ASSERT(!received); CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount()); diff --git a/qpid/cpp/src/tests/TxAckTest.cpp b/qpid/cpp/src/tests/TxAckTest.cpp index 89a907d495..65426e4e21 100644 --- a/qpid/cpp/src/tests/TxAckTest.cpp +++ b/qpid/cpp/src/tests/TxAckTest.cpp @@ -76,7 +76,9 @@ public: msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT); msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key"); messages.push_back(msg); - deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1))); + QueuedMessage qm; + qm.payload = msg; + deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1))); } //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) diff --git a/qpid/cpp/src/tests/TxPublishTest.cpp b/qpid/cpp/src/tests/TxPublishTest.cpp index 5628cf1d1c..4ec526f207 100644 --- a/qpid/cpp/src/tests/TxPublishTest.cpp +++ b/qpid/cpp/src/tests/TxPublishTest.cpp @@ -99,13 +99,13 @@ public: op.prepare(0); op.commit(); CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount()); - Message::shared_ptr msg_dequeue = queue1->dequeue(); + Message::shared_ptr msg_dequeue = queue1->dequeue().payload; CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete()); CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue); CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount()); - CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue()); + CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue().payload); } }; |