summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp242
1 files changed, 181 insertions, 61 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index c50fbd5559..6e577ab354 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -53,7 +53,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS
id(_id),
connection(con),
out(_out),
- currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
tagGenerator("sgen"),
@@ -75,17 +74,13 @@ bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
-// TODO aconway 2007-02-12: Why is connection token passed in instead
-// of using the channel's parent connection?
void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
- Queue::shared_ptr queue, bool acks,
- bool exclusive, ConnectionToken* const connection,
- const FieldTable*)
+ Queue::shared_ptr queue, bool nolocal, bool acks,
+ bool exclusive, const FieldTable*)
{
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
- std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
+ std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -210,7 +205,7 @@ void Channel::checkDtxTimeout()
void Channel::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
- delivery.addTo(&outstanding);
+ delivery.addTo(outstanding);
}
bool Channel::checkPrefetch(Message::shared_ptr& msg)
@@ -221,33 +216,61 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg)
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
- const string& _tag, Queue::shared_ptr _queue,
- ConnectionToken* const _connection, bool ack
- ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
- ackExpected(ack), blocked(false) {}
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent,
+ DeliveryToken::shared_ptr _token,
+ const string& _name,
+ Queue::shared_ptr _queue,
+ bool ack,
+ bool _nolocal
+ ) : parent(_parent),
+ token(_token),
+ name(_name),
+ queue(_queue),
+ ackExpected(ack),
+ nolocal(_nolocal),
+ blocked(false),
+ windowing(true),
+ msgCredit(0xFFFFFFFF),
+ byteCredit(0xFFFFFFFF) {}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
{
- if(!connection || connection != msg->getPublisher()){//check for no_local
- if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
+ if (nolocal && &(parent->connection) == msg->getPublisher()) {
+ return false;
+ } else {
+ if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) {
blocked = true;
- }else{
+ } else {
blocked = false;
+
Mutex::ScopedLock locker(parent->deliveryLock);
- uint64_t deliveryTag = parent->out.deliver(msg, token);
- if(ackExpected){
- parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
+ DeliveryId deliveryTag = parent->out.deliver(msg, token);
+ if (ackExpected) {
+ parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
}
+ }
+ return !blocked;
+ }
+}
- return true;
+bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+{
+ Mutex::ScopedLock l(lock);
+ if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
+ return false;
+ } else {
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit--;
+ }
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit -= msg->getRequiredCredit();
}
+ return true;
}
- return false;
}
-void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
+void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
parent->out.redeliver(msg, token, deliveryTag);
}
@@ -326,55 +349,71 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
}
-// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple)
+void Channel::ackCumulative(DeliveryId id)
{
- if (multiple)
- ack(0, deliveryTag);
- else
- ack(deliveryTag, deliveryTag);
+ ack(id, id, true);
}
-void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+void Channel::ackRange(DeliveryId first, DeliveryId last)
{
+ ack(first, last, false);
+}
+
+void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative)
+{
+ Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
+ ack_iterator start = cumulative ? unacked.begin() :
+ find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first));
+ ack_iterator end = start;
+
+ if (cumulative || first != last) {
+ //need to find end (position it just after the last record in range)
+ end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last));
+ } else {
+ //just acked single element (move end past it)
+ ++end;
+ }
+
+ for_each(start, end, boost::bind(&Channel::acknowledged, this, _1));
+
if (txBuffer.get()) {
- accumulatedAck.update(firstTag, lastTag);
- //TODO: I think the outstanding prefetch size & count should be updated at this point...
- //TODO: ...this may then necessitate dispatching to consumers
+ //in transactional mode, don't dequeue or remove, just
+ //maintain set of acknowledged messages:
+ accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last);
+
if (dtxBuffer.get()) {
+ //if enlisted in a dtx, remove the relevant slice from
+ //unacked and record it against that transaction
TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
accumulatedAck.clear();
dtxBuffer->enlist(txAck);
}
-
} else {
- Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0));
+ unacked.erase(start, end);
+ }
- ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
- ack_iterator j = (firstTag == 0) ?
- unacked.begin() :
- find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
-
- if(i == unacked.end()){
- throw ConnectionException(530, "Received ack for unrecognised delivery tag");
- }else if(i!=j){
- ack_iterator end = ++i;
- for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0));
- unacked.erase(unacked.begin(), end);
-
- //recalculate the prefetch:
- outstanding.reset();
- for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding));
- }else{
- i->discard();
- i->subtractFrom(&outstanding);
- unacked.erase(i);
- }
+ //if the prefetch limit had previously been reached, or credit
+ //had expired in windowing mode there may be messages that can
+ //be now be delivered
+ for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
+}
+
+void Channel::acknowledged(const DeliveryRecord& delivery)
+{
+ delivery.subtractFrom(outstanding);
+ ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag());
+ if (i != consumers.end()) {
+ i->acknowledged(delivery);
+ }
+}
- //if the prefetch limit had previously been reached, there may
- //be messages that can be now be delivered
- std::for_each(consumers.begin(), consumers.end(),
- boost::bind(&ConsumerImpl::requestDispatch, _1));
+void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery)
+{
+ if (windowing) {
+ if (msgCredit != 0xFFFFFFFF) msgCredit++;
+ if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit);
}
}
@@ -384,6 +423,8 @@ void Channel::recover(bool requeue)
if(requeue){
outstanding.reset();
+ //take copy and clear unacked as requeue may result in redelivery to this channel
+ //which will in turn result in additions to unacked
std::list<DeliveryRecord> copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue));
@@ -397,7 +438,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = out.deliver(msg, token);
+ DeliveryId myDeliveryTag = out.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}
@@ -408,7 +449,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool
}
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag,
- uint64_t deliveryTag)
+ DeliveryId deliveryTag)
{
ConsumerImplMap::iterator i = consumers.find(consumerTag);
if (i != consumers.end()){
@@ -426,3 +467,82 @@ void Channel::flow(bool active)
std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1));
}
}
+
+
+Channel::ConsumerImpl& Channel::find(const std::string& destination)
+{
+ ConsumerImplMap::iterator i = consumers.find(destination);
+ if (i == consumers.end()) {
+ throw ChannelException(404, boost::format("Unknown destination %1%") % destination);
+ } else {
+ return *i;
+ }
+}
+
+void Channel::setWindowMode(const std::string& destination)
+{
+ find(destination).setWindowMode();
+}
+
+void Channel::setCreditMode(const std::string& destination)
+{
+ find(destination).setCreditMode();
+}
+
+void Channel::addByteCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addByteCredit(value);
+}
+
+
+void Channel::addMessageCredit(const std::string& destination, uint32_t value)
+{
+ find(destination).addMessageCredit(value);
+}
+
+void Channel::flush(const std::string& destination)
+{
+ ConsumerImpl& c = find(destination);
+ c.flush();
+}
+
+
+void Channel::stop(const std::string& destination)
+{
+ find(destination).stop();
+}
+
+void Channel::ConsumerImpl::setWindowMode()
+{
+ windowing = true;
+}
+
+void Channel::ConsumerImpl::setCreditMode()
+{
+ windowing = false;
+}
+
+void Channel::ConsumerImpl::addByteCredit(uint32_t value)
+{
+ byteCredit += value;
+ requestDispatch();
+}
+
+void Channel::ConsumerImpl::addMessageCredit(uint32_t value)
+{
+ msgCredit += value;
+ requestDispatch();
+}
+
+void Channel::ConsumerImpl::flush()
+{
+ //TODO: need to wait until any messages that are available for
+ //this consumer have been delivered... i.e. some sort of flush on
+ //the queue...
+}
+
+void Channel::ConsumerImpl::stop()
+{
+ msgCredit = 0;
+ byteCredit = 0;
+}