summaryrefslogtreecommitdiff
path: root/cpp/broker/src/Channel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
committerGordon Sim <gsim@apache.org>2006-09-27 13:21:43 +0000
commit72aff4cc22932bd98b542ac9dd6393e23985e312 (patch)
treed3159aa7eeba203fea961b7397809ad805c74e1a /cpp/broker/src/Channel.cpp
parentf1745fde1a2164878794f0bcf62fa62aacbe5cae (diff)
downloadqpid-python-72aff4cc22932bd98b542ac9dd6393e23985e312.tar.gz
Initial implementation of basic_ack & basic_recover
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r--cpp/broker/src/Channel.cpp61
1 files changed, 53 insertions, 8 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index f1f7d63a39..b49635e026 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -25,6 +25,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
+
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
id(_id),
framesize(_framesize),
@@ -46,7 +47,7 @@ bool Channel::exists(string& consumerTag){
void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){
if(tag.empty()) tag = tagGenerator.generate();
- ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection));
+ ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks));
try{
queue->consume(c, exclusive);//may throw exception
consumers[tag] = c;
@@ -92,22 +93,29 @@ void Channel::rollback(){
}
-void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){
+void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){
+ Locker locker(deliveryLock);
+
+ u_int64_t myDeliveryTag = deliveryTag++;
+ if(ackExpected){
+ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+ }
//send deliver method, header and content(s)
- msg->deliver(out, id, consumerTag, deliveryTag++, framesize);
+ msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
}
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
Queue::shared_ptr _queue,
- ConnectionToken* const _connection) : parent(_parent),
- tag(_tag),
- queue(_queue),
- connection(_connection){
+ ConnectionToken* const _connection, bool ack) : parent(_parent),
+ tag(_tag),
+ queue(_queue),
+ connection(_connection),
+ ackExpected(ack){
}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(connection != msg->getPublisher()){
- parent->deliver(msg, tag);
+ parent->deliver(msg, tag, queue, ackExpected);
return true;
}else{
return false;
@@ -151,3 +159,40 @@ void Channel::publish(ExchangeRegistry* exchanges){
}
message.reset();
}
+
+void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag));
+ if(i == unacknowledged.end()){
+ //error: how should this be signalled?
+ }else if(multiple){
+ unacknowledged.erase(unacknowledged.begin(), ++i);
+ }else{
+ unacknowledged.erase(i);
+ }
+}
+
+void Channel::recover(bool requeue){
+ if(requeue){
+ //TODO: need to set redelivered flag
+ for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
+ unacknowledged.clear();
+ }else{
+ for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
+ }
+}
+
+Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {}
+
+bool Channel::MatchAck::operator()(AckRecord& record) const{
+ return tag == record.deliveryTag;
+}
+
+void Channel::Requeue::operator()(AckRecord& record) const{
+ record.queue->deliver(record.msg);
+}
+
+Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
+
+void Channel::Redeliver::operator()(AckRecord& record) const{
+ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
+}