summaryrefslogtreecommitdiff
path: root/cpp/lib/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-05-09 17:00:32 +0000
committerGordon Sim <gsim@apache.org>2007-05-09 17:00:32 +0000
commit3a87c67be419a3ae74ea456ae67be5d0f2d2ec92 (patch)
tree82f646b4394a31a6baa669f699a775454afadf36 /cpp/lib/broker/BrokerChannel.cpp
parente6fd98ab0f78c0b91c4b12075ffdb93bce2c4c0f (diff)
downloadqpid-python-3a87c67be419a3ae74ea456ae67be5d0f2d2ec92.tar.gz
* Added support for channel.flow:
cpp/tests/ChannelTest.cpp cpp/lib/broker/SessionHandlerImpl.cpp cpp/lib/broker/BrokerChannel.h cpp/lib/broker/BrokerChannel.cpp * Fixed client connection closing process: cpp/lib/common/sys/apr/Socket.cpp cpp/lib/client/Connector.h cpp/lib/client/Connector.cpp cpp/lib/client/Connection.h cpp/lib/client/Connection.cpp * Use amq.direct rather than default exchange in P2P test (to interop with java) cpp/tests/BasicP2Ptest.h git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@536584 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
-rw-r--r--cpp/lib/broker/BrokerChannel.cpp17
1 files changed, 15 insertions, 2 deletions
diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp
index 65aa50d3ac..d8fbdc467c 100644
--- a/cpp/lib/broker/BrokerChannel.cpp
+++ b/cpp/lib/broker/BrokerChannel.cpp
@@ -43,7 +43,8 @@ Channel::Channel(qpid::framing::ProtocolVersion& _version, OutputHandler* _out,
accumulatedAck(0),
store(_store),
messageBuilder(this, _store, _stagingThreshold),
- version(_version){
+ version(_version),
+ flowActive(true){
outstanding.reset();
}
@@ -142,7 +143,7 @@ Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
if(!connection || connection != msg->getPublisher()){//check for no_local
- if(ackExpected && !parent->checkPrefetch(msg)){
+ if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
blocked = true;
}else{
blocked = false;
@@ -257,3 +258,15 @@ bool Channel::get(Queue::shared_ptr queue, bool ackExpected){
void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){
msg->deliver(out, id, consumerTag, deliveryTag, framesize, &version);
}
+
+void Channel::flow(bool active){
+ Mutex::ScopedLock locker(deliveryLock);
+ bool requestDelivery(!flowActive && active);
+ flowActive = active;
+ if (requestDelivery) {
+ //there may be messages that can be now be delivered
+ for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){
+ j->second->requestDispatch();
+ }
+ }
+}