summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-11 08:24:42 +0000
committerGordon Sim <gsim@apache.org>2006-10-11 08:24:42 +0000
commit4fcd0a1f4d52dffe2c524af06882470dd4a48213 (patch)
tree7639836ccd43e6cf41372856735074fbb9e21443 /cpp
parent4b3a1e69274b04888866e3a239854dd061c57f98 (diff)
downloadqpid-python-4fcd0a1f4d52dffe2c524af06882470dd4a48213.tar.gz
Implementation of basic_get.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@462729 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/broker/inc/Channel.h32
-rw-r--r--cpp/broker/inc/Message.h14
-rw-r--r--cpp/broker/src/Channel.cpp53
-rw-r--r--cpp/broker/src/Message.cpp16
-rw-r--r--cpp/broker/src/Queue.cpp8
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp10
-rw-r--r--cpp/broker/test/QueueTest.cpp (renamed from cpp/broker/test/queue_test.cpp)47
7 files changed, 154 insertions, 26 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index 4f4d8e2890..a5a54aea1f 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -60,12 +60,24 @@ namespace qpid {
Queue::shared_ptr queue;
string consumerTag;
u_int64_t deliveryTag;
-
- AckRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue,
- string _consumerTag, u_int64_t _deliveryTag) : msg(_msg),
- queue(_queue),
- consumerTag(_consumerTag),
- deliveryTag(_deliveryTag){}
+ bool pull;
+
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const string _consumerTag,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(_consumerTag),
+ deliveryTag(_deliveryTag),
+ pull(false){}
+
+ AckRecord(Message::shared_ptr _msg,
+ Queue::shared_ptr _queue,
+ const u_int64_t _deliveryTag) : msg(_msg),
+ queue(_queue),
+ consumerTag(""),
+ deliveryTag(_deliveryTag),
+ pull(true){}
};
typedef std::vector<AckRecord>::iterator ack_iterator;
@@ -89,12 +101,14 @@ namespace qpid {
void operator()(AckRecord& record) const;
};
- class AddSize{
+ class CalculatePrefetch{
u_int32_t size;
+ u_int16_t count;
public:
- AddSize();
+ CalculatePrefetch();
void operator()(AckRecord& record);
u_int32_t getSize();
+ u_int16_t getCount();
};
const int id;
@@ -106,6 +120,7 @@ namespace qpid {
u_int32_t prefetchSize;
u_int16_t prefetchCount;
u_int32_t outstandingSize;
+ u_int16_t outstandingCount;
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
@@ -136,6 +151,7 @@ namespace qpid {
bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
void cancel(const string& tag);
+ bool get(Queue::shared_ptr queue, bool ackExpected);
void begin();
void close();
void commit();
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
index 7b2c2bc848..94b9aa5bdd 100644
--- a/cpp/broker/inc/Message.h
+++ b/cpp/broker/inc/Message.h
@@ -49,6 +49,9 @@ namespace qpid {
content_list content;
u_int64_t size;
+ void sendContent(qpid::framing::OutputHandler* out,
+ int channel, u_int32_t framesize);
+
public:
typedef std::tr1::shared_ptr<Message> shared_ptr;
@@ -61,9 +64,16 @@ namespace qpid {
bool isComplete();
const ConnectionToken* const getPublisher();
- void deliver(qpid::framing::OutputHandler* out, int channel,
- string& consumerTag, u_int64_t deliveryTag,
+ void deliver(qpid::framing::OutputHandler* out,
+ int channel,
+ const string& consumerTag,
+ u_int64_t deliveryTag,
u_int32_t framesize);
+ void sendGetOk(qpid::framing::OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize);
void redeliver();
qpid::framing::BasicHeaderProperties* getHeaderProperties();
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index f887e080a6..ed1125ee76 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -31,6 +31,7 @@ Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out)
prefetchCount(0),
prefetchSize(0),
outstandingSize(0),
+ outstandingCount(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
@@ -98,6 +99,7 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
outstandingSize += msg->contentSize();
+ outstandingCount++;
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
@@ -162,10 +164,15 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
throw InvalidAckException();
}else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
- //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?):
- outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize();
+ //recompute prefetch outstanding (note: messages delivered through get are ignored)
+ CalculatePrefetch calc(for_each(unacknowledged.begin(), unacknowledged.end(), CalculatePrefetch()));
+ outstandingSize = calc.getSize();
+ outstandingCount = calc.getCount();
}else{
- outstandingSize -= i->msg->contentSize();
+ if(!i->pull){
+ outstandingSize -= i->msg->contentSize();
+ outstandingCount--;
+ }
unacknowledged.erase(i);
}
@@ -181,6 +188,7 @@ void Channel::recover(bool requeue){
if(requeue){
outstandingSize = 0;
+ outstandingCount = 0;
ack_iterator start(unacknowledged.begin());
ack_iterator end(unacknowledged.end());
for_each(start, end, Requeue());
@@ -190,6 +198,21 @@ void Channel::recover(bool requeue){
}
}
+bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
+ Message::shared_ptr msg = queue->dequeue();
+ if(msg){
+ Locker locker(deliveryLock);
+ u_int64_t myDeliveryTag = deliveryTag++;
+ msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize);
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, myDeliveryTag));
+ }
+ return true;
+ }else{
+ return false;
+ }
+}
+
Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
bool Channel::MatchAck::operator()(AckRecord& record) const{
@@ -204,15 +227,29 @@ void Channel::Requeue::operator()(AckRecord& record) const{
Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
- record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ if(record.pull){
+ //if message was originally sent as response to get, we must requeue it
+ record.msg->redeliver();
+ record.queue->deliver(record.msg);
+ }else{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+ }
}
-Channel::AddSize::AddSize() : size(0){}
+Channel::CalculatePrefetch::CalculatePrefetch() : size(0){}
-void Channel::AddSize::operator()(AckRecord& record){
- size += record.msg->contentSize();
+void Channel::CalculatePrefetch::operator()(AckRecord& record){
+ if(!record.pull){
+ //ignore messages that were sent in response to get when calculating prefetch
+ size += record.msg->contentSize();
+ count++;
+ }
}
-u_int32_t Channel::AddSize::getSize(){
+u_int32_t Channel::CalculatePrefetch::getSize(){
return size;
}
+
+u_int16_t Channel::CalculatePrefetch::getCount(){
+ return count;
+}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index a44eeaab59..b0e5d16b77 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -59,10 +59,24 @@ void Message::redeliver(){
}
void Message::deliver(OutputHandler* out, int channel,
- string& consumerTag, u_int64_t deliveryTag,
+ const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendGetOk(OutputHandler* out,
+ int channel,
+ u_int32_t messageCount,
+ u_int64_t deliveryTag,
+ u_int32_t framesize){
+
+ out->send(new AMQFrame(channel, new BasicGetOkBody(deliveryTag, redelivered, exchange, routingKey, messageCount)));
+ sendContent(out, channel, framesize);
+}
+
+void Message::sendContent(OutputHandler* out, int channel, u_int32_t framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
for(content_iterator i = content.begin(); i != content.end(); i++){
diff --git a/cpp/broker/src/Queue.cpp b/cpp/broker/src/Queue.cpp
index f7b8605b03..1db4454235 100644
--- a/cpp/broker/src/Queue.cpp
+++ b/cpp/broker/src/Queue.cpp
@@ -122,7 +122,13 @@ void Queue::cancel(Consumer* c){
}
Message::shared_ptr Queue::dequeue(){
-
+ Locker locker(lock);
+ Message::shared_ptr msg;
+ if(!messages.empty()){
+ msg = messages.front();
+ messages.pop();
+ }
+ return msg;
}
u_int32_t Queue::purge(){
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 857730f3f7..ad73c1b23b 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -338,7 +338,6 @@ void SessionHandlerImpl::QueueHandlerImpl::delete_(u_int16_t channel, u_int16_t
void SessionHandlerImpl::BasicHandlerImpl::qos(u_int16_t channel, u_int32_t prefetchSize, u_int16_t prefetchCount, bool global){
//TODO: handle global
- //TODO: channel doesn't do anything with these qos parameters yet
parent->getChannel(channel)->setPrefetchSize(prefetchSize);
parent->getChannel(channel)->setPrefetchCount(prefetchCount);
parent->client.getBasic().qosOk(channel);
@@ -349,7 +348,6 @@ void SessionHandlerImpl::BasicHandlerImpl::consume(u_int16_t channelId, u_int16_
bool noLocal, bool noAck, bool exclusive,
bool nowait){
- //TODO: implement nolocal
Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
Channel* channel = parent->channels[channelId];
if(!consumerTag.empty() && channel->exists(consumerTag)){
@@ -382,7 +380,13 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
parent->getChannel(channel)->handlePublish(msg);
}
-void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
+void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channelId, u_int16_t ticket, string& queueName, bool noAck){
+ Queue::shared_ptr queue = parent->getQueue(queueName, channelId);
+ if(!parent->getChannel(channelId)->get(queue, !noAck)){
+ string clusterId;//not used, part of an imatix hack
+ parent->client.getBasic().getEmpty(channelId, clusterId);
+ }
+}
void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
try{
diff --git a/cpp/broker/test/queue_test.cpp b/cpp/broker/test/QueueTest.cpp
index aa423e7e08..973b1b5cf6 100644
--- a/cpp/broker/test/queue_test.cpp
+++ b/cpp/broker/test/QueueTest.cpp
@@ -47,12 +47,14 @@ public:
class QueueTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(QueueTest);
- CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST(testConsumers);
+ CPPUNIT_TEST(testBinding);
+ CPPUNIT_TEST(testRegistry);
+ CPPUNIT_TEST(testDequeue);
CPPUNIT_TEST_SUITE_END();
public:
- void testMe()
- {
+ void testConsumers(){
Queue::shared_ptr queue(new Queue("my_queue", true, true));
//Test adding consumers:
@@ -82,7 +84,10 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getConsumerCount());
queue->cancel(&c2);
CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getConsumerCount());
+ }
+ void testBinding(){
+ Queue::shared_ptr queue(new Queue("my_queue", true, true));
//Test bindings:
TestBinding a;
TestBinding b;
@@ -93,7 +98,9 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT(a.isCancelled());
CPPUNIT_ASSERT(b.isCancelled());
+ }
+ void testRegistry(){
//Test use of queues in registry:
QueueRegistry registry;
registry.declare("queue1", true, true);
@@ -112,6 +119,40 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT(!registry.find("queue2"));
CPPUNIT_ASSERT(!registry.find("queue3"));
}
+
+ void testDequeue(){
+ Queue::shared_ptr queue(new Queue("my_queue", true, true));
+
+ Message::shared_ptr msg1 = Message::shared_ptr(new Message(0, "e", "A", true, true));
+ Message::shared_ptr msg2 = Message::shared_ptr(new Message(0, "e", "B", true, true));
+ Message::shared_ptr msg3 = Message::shared_ptr(new Message(0, "e", "C", true, true));
+ Message::shared_ptr received;
+
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ queue->deliver(msg3);
+
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(3), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(2), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(1), queue->getMessageCount());
+
+ TestConsumer consumer;
+ queue->consume(&consumer);
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ received = queue->dequeue();
+ CPPUNIT_ASSERT(!received);
+ CPPUNIT_ASSERT_EQUAL(u_int32_t(0), queue->getMessageCount());
+
+ }
};
// Make this test suite a plugin.