summaryrefslogtreecommitdiff
path: root/cpp/lib/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/lib/broker')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp17
-rw-r--r--cpp/lib/broker/BrokerChannel.h2
-rw-r--r--cpp/lib/broker/SessionHandlerImpl.cpp9
3 files changed, 21 insertions, 7 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 65aa50d3ac..d8fbdc467c 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -43,7 +43,8 @@ Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out,
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version){
+ version(_version),
+ flowActive(true){
outstanding.reset();
}
@@ -142,7 +143,7 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
+ if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
blocked = true;
}else{
blocked = false;
@@ -257,3 +258,15 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
+
+void Channel::flow(bool active){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+ }
+}
diff --git a/cpp/lib/broker/BrokerChannel.h b/cpp/lib/broker/BrokerChannel.h
index 888ca3c051..be40a25013 100644
--- a/cpp/lib/broker/BrokerChannel.h
+++ b/cpp/lib/broker/BrokerChannel.h
@@ -89,6 +89,7 @@ namespace qpid {
MessageBuilder messageBuilder;//builder for in-progress message
Exchange::shared_ptr exchange;//exchange to which any in-progress message was published to
qpid::framing::ProtocolVersion version; // version used for this channel
+ bool flowActive;
virtual void complete(Message::shared_ptr& msg);
void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected);
@@ -118,6 +119,7 @@ namespace qpid {
void handlePublish(Message* msg, Exchange::shared_ptr exchange);
void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void handleContent(qpid::framing::AMQContentBody::shared_ptr content);
+ void flow(bool active);
};
struct InvalidAckException{};
diff --git a/cpp/lib/broker/SessionHandlerImpl.cpp b/cpp/lib/broker/SessionHandlerImpl.cpp
index 49767f4c71..b91bee5a4b 100644
--- a/cpp/lib/broker/SessionHandlerImpl.cpp
+++ b/cpp/lib/broker/SessionHandlerImpl.cpp
@@ -233,12 +233,11 @@ void SessionHandlerImpl::ChannelHandlerImpl::open(u_int16_t channel, const strin
}
}
-void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t /*channel*/, bool /*active*/){
-
+void SessionHandlerImpl::ChannelHandlerImpl::flow(u_int16_t channel, bool active){
+ parent->getChannel(channel)->flow(active);
+ parent->client->getChannel().flowOk(channel, active);
}
-void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){
-
-}
+void SessionHandlerImpl::ChannelHandlerImpl::flowOk(u_int16_t /*channel*/, bool /*active*/){}
void SessionHandlerImpl::ChannelHandlerImpl::close(u_int16_t channel, u_int16_t /*replyCode*/, const string& /*replyText*/,
u_int16_t /*classId*/, u_int16_t /*methodId*/){