diff options
author | Gordon Sim <gsim@apache.org> | 2006-09-27 13:21:43 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-09-27 13:21:43 +0000 |
commit | 72aff4cc22932bd98b542ac9dd6393e23985e312 (patch) | |
tree | d3159aa7eeba203fea961b7397809ad805c74e1a /cpp/broker/src/Channel.cpp | |
parent | f1745fde1a2164878794f0bcf62fa62aacbe5cae (diff) | |
download | qpid-python-72aff4cc22932bd98b542ac9dd6393e23985e312.tar.gz |
Initial implementation of basic_ack & basic_recover
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450434 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r-- | cpp/broker/src/Channel.cpp | 61 |
1 files changed, 53 insertions, 8 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index f1f7d63a39..b49635e026 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -25,6 +25,7 @@ using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; + Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), id(_id), framesize(_framesize), @@ -46,7 +47,7 @@ bool Channel::exists(string& consumerTag){ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ if(tag.empty()) tag = tagGenerator.generate(); - ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection)); + ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); try{ queue->consume(c, exclusive);//may throw exception consumers[tag] = c; @@ -92,22 +93,29 @@ void Channel::rollback(){ } -void Channel::deliver(Message::shared_ptr& msg, string& consumerTag){ +void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ + Locker locker(deliveryLock); + + u_int64_t myDeliveryTag = deliveryTag++; + if(ackExpected){ + unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); + } //send deliver method, header and content(s) - msg->deliver(out, id, consumerTag, deliveryTag++, framesize); + msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); } Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, Queue::shared_ptr _queue, - ConnectionToken* const _connection) : parent(_parent), - tag(_tag), - queue(_queue), - connection(_connection){ + ConnectionToken* const _connection, bool ack) : parent(_parent), + tag(_tag), + queue(_queue), + connection(_connection), + ackExpected(ack){ } bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(connection != msg->getPublisher()){ - parent->deliver(msg, tag); + parent->deliver(msg, tag, queue, ackExpected); return true; }else{ return false; @@ -151,3 +159,40 @@ void Channel::publish(ExchangeRegistry* exchanges){ } message.reset(); } + +void Channel::ack(u_int64_t deliveryTag, bool multiple){ + ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag)); + if(i == unacknowledged.end()){ + //error: how should this be signalled? + }else if(multiple){ + unacknowledged.erase(unacknowledged.begin(), ++i); + }else{ + unacknowledged.erase(i); + } +} + +void Channel::recover(bool requeue){ + if(requeue){ + //TODO: need to set redelivered flag + for_each(unacknowledged.begin(), unacknowledged.end(), Requeue()); + unacknowledged.clear(); + }else{ + for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); + } +} + +Channel::MatchAck::MatchAck(u_int64_t _tag) : tag(_tag) {} + +bool Channel::MatchAck::operator()(AckRecord& record) const{ + return tag == record.deliveryTag; +} + +void Channel::Requeue::operator()(AckRecord& record) const{ + record.queue->deliver(record.msg); +} + +Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} + +void Channel::Redeliver::operator()(AckRecord& record) const{ + record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); +} |