summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp99
1 files changed, 33 insertions, 66 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index c81e73aba1..523a834715 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -28,7 +28,6 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include "qpid/framing/ChannelAdapter.h"
#include "qpid/QpidError.h"
#include "BrokerAdapter.h"
@@ -50,8 +49,8 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
- ChannelAdapter(),
+Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+ id(_id),
connection(con),
currentDeliveryTag(1),
prefetchSize(0),
@@ -62,10 +61,8 @@ Channel::Channel(Connection& con, ChannelId id, MessageStore* const _store) :
store(_store),
messageBuilder(this, _store, connection.getStagingThreshold()),
opened(id == 0),//channel 0 is automatically open, other must be explicitly opened
- flowActive(true),
- adapter(new BrokerAdapter(*this, con, con.broker))
+ flowActive(true)
{
- init(id, con.getOutput(), con.getVersion());
outstanding.reset();
}
@@ -79,14 +76,15 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(string& tagInOut, Queue::shared_ptr queue, bool acks,
+void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+ Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -195,22 +193,10 @@ void Channel::checkDtxTimeout()
}
}
-void Channel::deliver(
- Message::shared_ptr& msg, const string& consumerTag,
- Queue::shared_ptr& queue, bool ackExpected)
+void Channel::record(const DeliveryRecord& delivery)
{
- Mutex::ScopedLock locker(deliveryLock);
-
- // Key the delivered messages to the id of the request in which they're sent
- uint64_t deliveryTag = getNextSendRequestId();
-
- if(ackExpected){
- unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag));
- outstanding.size += msg->contentSize();
- outstanding.count++;
- }
- //send deliver method, header and content(s)
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
+ unacked.push_back(delivery);
+ delivery.addTo(&outstanding);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg){
@@ -220,11 +206,11 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg){
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
- Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
-) : parent(_parent), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+ const string& _tag, Queue::shared_ptr _queue,
+ ConnectionToken* const _connection, bool ack
+ ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ackExpected(ack), blocked(false) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
@@ -232,13 +218,25 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = true;
}else{
blocked = false;
- parent->deliver(msg, tag, queue, ackExpected);
+ Mutex::ScopedLock locker(parent->deliveryLock);
+
+ uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ if(ackExpected){
+ parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+ }
+ adapter->deliver(msg, deliveryTag);
+
return true;
}
}
return false;
}
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+ Mutex::ScopedLock locker(parent->deliveryLock);
+ adapter->deliver(msg, deliveryTag);
+}
+
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
@@ -298,10 +296,6 @@ void Channel::complete(Message::shared_ptr msg) {
}
}
-void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
-}
-
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
if (multiple)
@@ -365,15 +359,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackExpected){
+bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = getNextSendRequestId();
- msg->sendGetOk(MethodContext(this, msg->getRespondTo()),
- destination,
- queue->getMessageCount() + 1, myDeliveryTag,
- connection.getFrameMax());
+ uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
+ adapter.deliver(msg, myDeliveryTag);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -386,33 +377,9 @@ bool Channel::get(Queue::shared_ptr queue, const string& destination, bool ackEx
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
uint64_t deliveryTag)
{
- msg->deliver(*this, consumerTag, deliveryTag, connection.getFrameMax());
-}
-
-void Channel::handleMethodInContext(
- boost::shared_ptr<qpid::framing::AMQMethodBody> method,
- const MethodContext& context
-)
-{
- try{
- if(getId() != 0 && !method->isA<ChannelOpenBody>() && !isOpen()) {
- if (!method->isA<ChannelCloseOkBody>()) {
- std::stringstream out;
- out << "Attempt to use unopened channel: " << getId();
- throw ConnectionException(504, out.str());
- }
- } else {
- method->invoke(*adapter, context);
- }
- }catch(ChannelException& e){
- adapter->getProxy().getChannel().close(
- e.code, e.toString(),
- method->amqpClassId(), method->amqpMethodId());
- connection.closeChannel(getId());
- }catch(ConnectionException& e){
- connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- }catch(std::exception& e){
- connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId());
+ ConsumerImplMap::iterator i = consumers.find(consumerTag);
+ if (i != consumers.end()){
+ i->redeliver(msg, deliveryTag);
}
}