diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 99 |
1 files changed, 33 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c81e73aba1..523a834715 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -28,7 +28,6 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include "qpid/framing/ChannelAdapter.h" #include "qpid/QpidError.h" #include "BrokerAdapter.h" @@ -50,8 +49,8 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : - ChannelAdapter(), +Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : + id(_id), connection(con), currentDeliveryTag(1), prefetchSize(0), @@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : store(_store), messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - flowActive(true), - adapter(new BrokerAdapter(*this, con, con.broker)) + flowActive(true) { - init(id, con.getOutput(), con.getVersion()); outstanding.reset(); } @@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, +void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, + Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -195,22 +193,10 @@ void Channel::checkDtxTimeout() } } -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) +void Channel::record(const DeliveryRecord& delivery) { - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); + unacked.push_back(delivery); + delivery.addTo(&outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, + const string& _tag, Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack + ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = true; }else{ blocked = false; - parent->deliver(msg, tag, queue, ackExpected); + Mutex::ScopedLock locker(parent->deliveryLock); + + uint64_t deliveryTag = adapter->getNextDeliveryTag(); + if(ackExpected){ + parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + } + adapter->deliver(msg, deliveryTag); + return true; } } return false; } +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + adapter->deliver(msg, deliveryTag); +} + Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } @@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) { } } -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) @@ -365,15 +359,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ +bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - connection.getFrameMax()); + uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); + adapter.deliver(msg, myDeliveryTag); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); } } |