diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-17 08:28:48 +0000 |
commit | ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d (patch) | |
tree | ea71b96a92eb5402b71a4c08312fbe1d8b835bbc /cpp/src/qpid/broker/BrokerChannel.cpp | |
parent | 54b8fe305e87f623bbeb2c50bea20a332f71a983 (diff) | |
download | qpid-python-ce9743f8f1640d42af5fe7aaa8fe7e3ca82a914d.tar.gz |
Some refactoring towards a more decoupled handler chain structure:
* Connection no longer depends on Channel; it contains a map of
FrameHandler::Chains. (The construction of the chains still refers
to specific handlers).
* Channel is no longer tied to ChannelAdapter through inheritance. The
former is independent of any particular handler chain or protocol
version, the latter is still used by ConnectionAdapter and
SemanticHandler in the 0-9 chain.
* A DeliveryAdapter interface has been introduced as part of the
separation of ChannelAdapter from Channel. This is intended to adapt
from a version independent core to version specific mechanisms for
sending messages. i.e. it fulfills the same role for outputs that
e.g. BrokerAdapter does for inputs. (Its not perfect yet by any
means but is a step on the way to the correct model I think).
* The connection related methods sent over channel zero are
implemented in their own adapter (ConnectionAdapter), and are
entirely separate from the semantic layer. The channel control
methods are still bundled with the proper semantic layer methods;
they too can be separated but would have to share the request id
with the semantic method handler due to the nature of the 0-9 WIP.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@556846 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 99 |
1 files changed, 33 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c81e73aba1..523a834715 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -28,7 +28,6 @@ #include <boost/bind.hpp> #include <boost/format.hpp> -#include "qpid/framing/ChannelAdapter.h" #include "qpid/QpidError.h" #include "BrokerAdapter.h" @@ -50,8 +49,8 @@ using namespace qpid::framing; using namespace qpid::sys; -Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : - ChannelAdapter(), +Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) : + id(_id), connection(con), currentDeliveryTag(1), prefetchSize(0), @@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) : store(_store), messageBuilder(this, _store, connection.getStagingThreshold()), opened(id == 0),//channel 0 is automatically open, other must be explicitly opened - flowActive(true), - adapter(new BrokerAdapter(*this, con, con.broker)) + flowActive(true) { - init(id, con.getOutput(), con.getVersion()); outstanding.reset(); } @@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){ // TODO aconway 2007-02-12: Why is connection token passed in instead // of using the channel's parent connection? -void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks, +void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut, + Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, tagInOut, queue, connection, acks)); + new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -195,22 +193,10 @@ void Channel::checkDtxTimeout() } } -void Channel::deliver( - Message::shared_ptr& msg, const string& consumerTag, - Queue::shared_ptr& queue, bool ackExpected) +void Channel::record(const DeliveryRecord& delivery) { - Mutex::ScopedLock locker(deliveryLock); - - // Key the delivered messages to the id of the request in which they're sent - uint64_t deliveryTag = getNextSendRequestId(); - - if(ackExpected){ - unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); - outstanding.size += msg->contentSize(); - outstanding.count++; - } - //send deliver method, header and content(s) - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); + unacked.push_back(delivery); + delivery.addTo(&outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg){ @@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){ return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag, - Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack -) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter, + const string& _tag, Queue::shared_ptr _queue, + ConnectionToken* const _connection, bool ack + ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection), + ackExpected(ack), blocked(false) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ if(!connection || connection != msg->getPublisher()){//check for no_local @@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ blocked = true; }else{ blocked = false; - parent->deliver(msg, tag, queue, ackExpected); + Mutex::ScopedLock locker(parent->deliveryLock); + + uint64_t deliveryTag = adapter->getNextDeliveryTag(); + if(ackExpected){ + parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + } + adapter->deliver(msg, deliveryTag); + return true; } } return false; } +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { + Mutex::ScopedLock locker(parent->deliveryLock); + adapter->deliver(msg, deliveryTag); +} + Channel::ConsumerImpl::~ConsumerImpl() { cancel(); } @@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) { } } -void Channel::ack(){ - ack(getFirstAckRequest(), getLastAckRequest()); -} - // Used by Basic void Channel::ack(uint64_t deliveryTag, bool multiple){ if (multiple) @@ -365,15 +359,12 @@ void Channel::recover(bool requeue){ } } -bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){ +bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){ Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = getNextSendRequestId(); - msg->sendGetOk(MethodContext(this, msg->getRespondTo()), - destination, - queue->getMessageCount() + 1, myDeliveryTag, - connection.getFrameMax()); + uint64_t myDeliveryTag = adapter.getNextDeliveryTag(); + adapter.deliver(msg, myDeliveryTag); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag) { - msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax()); -} - -void Channel::handleMethodInContext( - boost::shared_ptr<qpid::framing::AMQMethodBody> method, - const MethodContext& context -) -{ - try{ - if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) { - if (!method->isA<ChannelCloseOkBody>()) { - std::stringstream out; - out << "Attempt to use unopened channel: " << getId(); - throw ConnectionException(504, out.str()); - } - } else { - method->invoke(*adapter, context); - } - }catch(ChannelException& e){ - adapter->getProxy().getChannel().close( - e.code, e.toString(), - method->amqpClassId(), method->amqpMethodId()); - connection.closeChannel(getId()); - }catch(ConnectionException& e){ - connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); + ConsumerImplMap::iterator i = consumers.find(consumerTag); + if (i != consumers.end()){ + i->redeliver(msg, deliveryTag); } } |