diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-11 08:24:42 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-11 08:24:42 +0000 |
commit | 4fcd0a1f4d52dffe2c524af06882470dd4a48213 (patch) | |
tree | 7639836ccd43e6cf41372856735074fbb9e21443 /cpp | |
parent | 4b3a1e69274b04888866e3a239854dd061c57f98 (diff) | |
download | qpid-python-4fcd0a1f4d52dffe2c524af06882470dd4a48213.tar.gz |
Implementation of basic_get.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@462729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/broker/inc/Channel.h | 32 | ||||
-rw-r--r-- | cpp/broker/inc/Message.h | 14 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 53 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 16 | ||||
-rw-r--r-- | cpp/broker/src/Queue.cpp | 8 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 10 | ||||
-rw-r--r-- | cpp/broker/test/QueueTest.cpp (renamed from cpp/broker/test/queue_test.cpp) | 47 |
7 files changed, 154 insertions, 26 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index 4f4d8e2890..a5a54aea1f 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -60,12 +60,24 @@ namespace qpid { Queue::shared_ptr queue; string consumerTag; u_int64_t deliveryTag; - - AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, - string _consumerTag, u_int64_t _deliveryTag) : msg(_msg), - queue(_queue), - consumerTag(_consumerTag), - deliveryTag(_deliveryTag){} + bool pull; + + AckRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const string _consumerTag, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(_consumerTag), + deliveryTag(_deliveryTag), + pull(false){} + + AckRecord(Message::shared_ptr _msg, + Queue::shared_ptr _queue, + const u_int64_t _deliveryTag) : msg(_msg), + queue(_queue), + consumerTag(""), + deliveryTag(_deliveryTag), + pull(true){} }; typedef std::vector<AckRecord>::iterator ack_iterator; @@ -89,12 +101,14 @@ namespace qpid { void operator()(AckRecord& record) const; }; - class AddSize{ + class CalculatePrefetch{ u_int32_t size; + u_int16_t count; public: - AddSize(); + CalculatePrefetch(); void operator()(AckRecord& record); u_int32_t getSize(); + u_int16_t getCount(); }; const int id; @@ -106,6 +120,7 @@ namespace qpid { u_int32_t prefetchSize; u_int16_t prefetchCount; u_int32_t outstandingSize; + u_int16_t outstandingCount; u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; @@ -136,6 +151,7 @@ namespace qpid { bool exists(const string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); void cancel(const string& tag); + bool get(Queue::shared_ptr queue, bool ackExpected); void begin(); void close(); void commit(); diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h index 7b2c2bc848..94b9aa5bdd 100644 --- a/cpp/broker/inc/Message.h +++ b/cpp/broker/inc/Message.h @@ -49,6 +49,9 @@ namespace qpid { content_list content; u_int64_t size; + void sendContent(qpid::framing::OutputHandler* out, + int channel, u_int32_t framesize); + public: typedef std::tr1::shared_ptr<Message> shared_ptr; @@ -61,9 +64,16 @@ namespace qpid { bool isComplete(); const ConnectionToken* const getPublisher(); - void deliver(qpid::framing::OutputHandler* out, int channel, - string& consumerTag, u_int64_t deliveryTag, + void deliver(qpid::framing::OutputHandler* out, + int channel, + const string& consumerTag, + u_int64_t deliveryTag, u_int32_t framesize); + void sendGetOk(qpid::framing::OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize); void redeliver(); qpid::framing::BasicHeaderProperties* getHeaderProperties(); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f887e080a6..ed1125ee76 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out) prefetchCount(0), prefetchSize(0), outstandingSize(0), + outstandingCount(0), framesize(_framesize), transactional(false), deliveryTag(1), @@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); outstandingSize += msg->contentSize(); + outstandingCount++; } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); @@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ throw InvalidAckException(); }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); - //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): - outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); + //recompute prefetch outstanding (note: messages delivered through get are ignored) + CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch())); + outstandingSize = calc.getSize(); + outstandingCount = calc.getCount(); }else{ - outstandingSize -= i->msg->contentSize(); + if(!i->pull){ + outstandingSize -= i->msg->contentSize(); + outstandingCount--; + } unacknowledged.erase(i); } @@ -181,6 +188,7 @@ void Channel::recover(bool requeue){ if(requeue){ outstandingSize = 0; + outstandingCount = 0; ack_iterator start(unacknowledged.begin()); ack_iterator end(unacknowledged.end()); for_each(start, end, Requeue()); @@ -190,6 +198,21 @@ void Channel::recover(bool requeue){ } } +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ + Message::shared_ptr msg = queue->dequeue(); + if(msg){ + Locker locker(deliveryLock); + u_int64_t myDeliveryTag = deliveryTag++; + msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag)); + } + return true; + }else{ + return false; + } +} + Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} bool Channel::MatchAck::operator()(AckRecord& record) const{ @@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ - record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + if(record.pull){ + //if message was originally sent as response to get, we must requeue it + record.msg->redeliver(); + record.queue->deliver(record.msg); + }else{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); + } } -Channel::AddSize::AddSize() : size(0){} +Channel::CalculatePrefetch::CalculatePrefetch() : size(0){} -void Channel::AddSize::operator()(AckRecord& record){ - size += record.msg->contentSize(); +void Channel::CalculatePrefetch::operator()(AckRecord& record){ + if(!record.pull){ + //ignore messages that were sent in response to get when calculating prefetch + size += record.msg->contentSize(); + count++; + } } -u_int32_t Channel::AddSize::getSize(){ +u_int32_t Channel::CalculatePrefetch::getSize(){ return size; } + +u_int16_t Channel::CalculatePrefetch::getCount(){ + return count; +} diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index a44eeaab59..b0e5d16b77 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -59,10 +59,24 @@ void Message::redeliver(){ } void Message::deliver(OutputHandler* out, int channel, - string& consumerTag, u_int64_t deliveryTag, + const string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); + sendContent(out, channel, framesize); +} + +void Message::sendGetOk(OutputHandler* out, + int channel, + u_int32_t messageCount, + u_int64_t deliveryTag, + u_int32_t framesize){ + + out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount))); + sendContent(out, channel, framesize); +} + +void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){ AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); for(content_iterator i = content.begin(); i != content.end(); i++){ diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp index f7b8605b03..1db4454235 100644 --- a/cpp/broker/src/Queue.cpp +++ b/cpp/broker/src/Queue.cpp @@ -122,7 +122,13 @@ void Queue::cancel(Consumer* c){ } Message::shared_ptr Queue::dequeue(){ - + Locker locker(lock); + Message::shared_ptr msg; + if(!messages.empty()){ + msg = messages.front(); + messages.pop(); + } + return msg; } u_int32_t Queue::purge(){ diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 857730f3f7..ad73c1b23b 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -338,7 +338,6 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){ //TODO: handle global - //TODO: channel doesn't do anything with these qos parameters yet parent->getChannel(channel)->setPrefetchSize(prefetchSize); parent->getChannel(channel)->setPrefetchCount(prefetchCount); parent->client.getBasic().qosOk(channel); @@ -349,7 +348,6 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_ bool noLocal, bool noAck, bool exclusive, bool nowait){ - //TODO: implement nolocal Queue::shared_ptr queue = parent->getQueue(queueName, channelId); Channel* channel = parent->channels[channelId]; if(!consumerTag.empty() && channel->exists(consumerTag)){ @@ -382,7 +380,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t parent->getChannel(channel)->handlePublish(msg); } -void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} +void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t ticket, string& queueName, bool noAck){ + Queue::shared_ptr queue = parent->getQueue(queueName, channelId); + if(!parent->getChannel(channelId)->get(queue, !noAck)){ + string clusterId;//not used, part of an imatix hack + parent->client.getBasic().getEmpty(channelId, clusterId); + } +} void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ try{ diff --git a/cpp/broker/test/queue_test.cpp b/cpp/broker/test/QueueTest.cpp index aa423e7e08..973b1b5cf6 100644 --- a/cpp/broker/test/queue_test.cpp +++ b/cpp/broker/test/QueueTest.cpp @@ -47,12 +47,14 @@ public: class QueueTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(QueueTest); - CPPUNIT_TEST(testMe); + CPPUNIT_TEST(testConsumers); + CPPUNIT_TEST(testBinding); + CPPUNIT_TEST(testRegistry); + CPPUNIT_TEST(testDequeue); CPPUNIT_TEST_SUITE_END(); public: - void testMe() - { + void testConsumers(){ Queue::shared_ptr queue(new Queue("my_queue", true, true)); //Test adding consumers: @@ -82,7 +84,10 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount()); queue->cancel(&c2); CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount()); + } + void testBinding(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); //Test bindings: TestBinding a; TestBinding b; @@ -93,7 +98,9 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT(a.isCancelled()); CPPUNIT_ASSERT(b.isCancelled()); + } + void testRegistry(){ //Test use of queues in registry: QueueRegistry registry; registry.declare("queue1", true, true); @@ -112,6 +119,40 @@ class QueueTest : public CppUnit::TestCase CPPUNIT_ASSERT(!registry.find("queue2")); CPPUNIT_ASSERT(!registry.find("queue3")); } + + void testDequeue(){ + Queue::shared_ptr queue(new Queue("my_queue", true, true)); + + Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true)); + Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true)); + Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true)); + Message::shared_ptr received; + + queue->deliver(msg1); + queue->deliver(msg2); + queue->deliver(msg3); + + CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount()); + + TestConsumer consumer; + queue->consume(&consumer); + queue->dispatch(); + CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get()); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + received = queue->dequeue(); + CPPUNIT_ASSERT(!received); + CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount()); + + } }; // Make this test suite a plugin. |