summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp28
1 files changed, 13 insertions, 15 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 87062e1470..0a85b8e4f0 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -69,7 +69,7 @@ Channel::~Channel()
void Channel::open(const Session& s)
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
@@ -80,7 +80,7 @@ void Channel::open(const Session& s)
}
bool Channel::isOpen() const {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
return active;
}
@@ -146,7 +146,7 @@ void Channel::consume(
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ c.count = 0;
}
uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
@@ -205,7 +205,7 @@ void Channel::close()
{
session.close();
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
active = false;
}
stop();
@@ -231,20 +231,18 @@ void Channel::join() {
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- MessageListener* listener(0);
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- listener = i->second.listener;
- }
- }
- if (listener) {
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
Message msg;
msg.populate(content);
+ MessageListener* listener = i->second.listener;
listener->received(msg);
+ if (isOpen() && i->second.ackMode != CLIENT_ACK) {
+ bool send = i->second.ackMode == AUTO_ACK
+ || (prefetch && ++(i->second.count) > (prefetch / 2));
+ if (send) i->second.count = 0;
+ session.execution().completed(content.getId(), true, send);
+ }
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
}