diff options
author | Gordon Sim <gsim@apache.org> | 2007-09-18 16:53:34 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-09-18 16:53:34 +0000 |
commit | b294db7e778c8476c26401fe86ebb6a795694194 (patch) | |
tree | 82195eeae289572e9c736c62baee7a7fef713213 /cpp/src/qpid/client/ClientChannel.cpp | |
parent | e0bf5acc51a983b2cb5c3d959b513d21a2cb57c1 (diff) | |
download | qpid-python-b294db7e778c8476c26401fe86ebb6a795694194.tar.gz |
Use credit mode when using NO_ACK and prefetch is not set.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@576976 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/ClientChannel.cpp | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 0a85b8e4f0..f5362bf688 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -90,12 +90,12 @@ void Channel::setPrefetch(uint32_t _prefetch){ void Channel::declareExchange(Exchange& _exchange, bool synch){ ScopedSync s(session, synch); - session.exchangeDeclare((exchange=_exchange.getName(), type=_exchange.getType())); + session.exchangeDeclare_(exchange=_exchange.getName(), type=_exchange.getType()); } void Channel::deleteExchange(Exchange& _exchange, bool synch){ ScopedSync s(session, synch); - session.exchangeDelete((exchange=_exchange.getName(), ifUnused=false)); + session.exchangeDelete_(exchange=_exchange.getName(), ifUnused=false); } void Channel::declareQueue(Queue& _queue, bool synch){ @@ -106,14 +106,14 @@ void Channel::declareQueue(Queue& _queue, bool synch){ } ScopedSync s(session, synch); - session.queueDeclare((queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), - exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete())); + session.queueDeclare_(queue=_queue.getName(), passive=false/*passive*/, durable=_queue.isDurable(), + exclusive=_queue.isExclusive(), autoDelete=_queue.isAutoDelete()); } void Channel::deleteQueue(Queue& _queue, bool ifunused, bool ifempty, bool synch){ ScopedSync s(session, synch); - session.queueDelete((queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty)); + session.queueDelete_(queue=_queue.getName(), ifUnused=ifunused, ifEmpty=ifempty); } void Channel::bind(const Exchange& exchange, const Queue& queue, const std::string& key, const FieldTable& args, bool synch){ @@ -153,6 +153,10 @@ void Channel::consume( session.messageSubscribe(0, _queue.getName(), tag, noLocal, confirmMode, 0/*pre-acquire*/, false, fields ? *fields : FieldTable()); + if (!prefetch) { + session.messageFlowMode(tag, 0/*credit based*/); + } + //allocate some credit: session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); session.messageFlow(tag, 0/*MESSAGES*/, prefetch ? prefetch : 0xFFFFFFFF); @@ -177,7 +181,7 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { ScopedDivert handler(tag, session.execution().getDemux()); Demux::Queue& incoming = handler.getQueue(); - session.messageSubscribe((destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1))); + session.messageSubscribe_(destination=tag, queue=_queue.getName(), confirmMode=(ackMode == NO_ACK ? 0 : 1)); session.messageFlow(tag, 1/*BYTES*/, 0xFFFFFFFF); session.messageFlow(tag, 0/*MESSAGES*/, 1); Completion status = session.messageFlush(tag); @@ -188,6 +192,7 @@ bool Channel::get(Message& msg, const Queue& _queue, AckMode ackMode) { return false; } else { msg.populate(*(incoming.pop())); + if (ackMode == AUTO_ACK) msg.acknowledge(session, false, true); return true; } } @@ -198,7 +203,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, msg.getDeliveryProperties().setRoutingKey(routingKey); msg.getDeliveryProperties().setDiscardUnroutable(!mandatory); - session.messageTransfer((destination=exchange.getName(), content=msg)); + session.messageTransfer_(destination=exchange.getName(), content=msg); } void Channel::close() @@ -238,7 +243,7 @@ void Channel::dispatch(FrameSet& content, const std::string& destination) MessageListener* listener = i->second.listener; listener->received(msg); if (isOpen() && i->second.ackMode != CLIENT_ACK) { - bool send = i->second.ackMode == AUTO_ACK + bool send = i->second.ackMode == AUTO_ACK || (prefetch && ++(i->second.count) > (prefetch / 2)); if (send) i->second.count = 0; session.execution().completed(content.getId(), true, send); |