summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/ClientChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
committerGordon Sim <gsim@apache.org>2007-09-13 17:29:16 +0000
commit0a1b3430450f274aee273a9f792a2d43f771b85f (patch)
tree71be3bc1a920a568c0680f8e8a5e802c1c3bee8d /cpp/src/qpid/client/ClientChannel.cpp
parente00a1cfa3881e3bb8aadfecdf502f17903e319b1 (diff)
downloadqpid-python-0a1b3430450f274aee273a9f792a2d43f771b85f.tar.gz
Use frameset begin/end flags for determining frameset boundaries.
Set frameset & segment begin/end flags for content bearing methods (i.e. messages). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@575377 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/ClientChannel.cpp')
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp28
1 files changed, 13 insertions, 15 deletions
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 87062e1470..0a85b8e4f0 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -69,7 +69,7 @@ Channel::~Channel()
void Channel::open(const Session& s)
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
if (isOpen())
THROW_QPID_ERROR(INTERNAL_ERROR, "Attempt to re-open channel");
active = true;
@@ -80,7 +80,7 @@ void Channel::open(const Session& s)
}
bool Channel::isOpen() const {
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
return active;
}
@@ -146,7 +146,7 @@ void Channel::consume(
Consumer& c = consumers[tag];
c.listener = listener;
c.ackMode = ackMode;
- c.lastDeliveryTag = 0;
+ c.count = 0;
}
uint8_t confirmMode = ackMode == NO_ACK ? 0 : 1;
ScopedSync s(session, synch);
@@ -205,7 +205,7 @@ void Channel::close()
{
session.close();
{
- Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(stopLock);
active = false;
}
stop();
@@ -231,20 +231,18 @@ void Channel::join() {
void Channel::dispatch(FrameSet& content, const std::string& destination)
{
- MessageListener* listener(0);
- {
- Mutex::ScopedLock l(lock);
- ConsumerMap::iterator i = consumers.find(destination);
- if (i != consumers.end()) {
- Message msg;
- msg.populate(content);
- listener = i->second.listener;
- }
- }
- if (listener) {
+ ConsumerMap::iterator i = consumers.find(destination);
+ if (i != consumers.end()) {
Message msg;
msg.populate(content);
+ MessageListener* listener = i->second.listener;
listener->received(msg);
+ if (isOpen() && i->second.ackMode != CLIENT_ACK) {
+ bool send = i->second.ackMode == AUTO_ACK
+ || (prefetch && ++(i->second.count) > (prefetch / 2));
+ if (send) i->second.count = 0;
+ session.execution().completed(content.getId(), true, send);
+ }
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << destination);
}