summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/BasicMessageChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
committerGordon Sim <gsim@apache.org>2007-07-05 09:47:07 +0000
commit07c8c499649c725a226eeda3e0bfe58fa8ba984c (patch)
tree0f71fc80b9e6e9929184334f4dc7d8fc03f7ccc0 /cpp/src/qpid/client/BasicMessageChannel.cpp
parentd4be469092c558ca9031d82b963b8b845fa1e1bd (diff)
downloadqpid-python-07c8c499649c725a226eeda3e0bfe58fa8ba984c.tar.gz
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/BasicMessageChannel.cpp')
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp23
1 files changed, 11 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index 91849c735e..60368268c0 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
destGet.shutdown();
destDispatch.shutdown();
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
+}
+
+void BasicMessageChannel::cancelAll(){
+ Mutex::ScopedLock l(lock);
+ for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
{
Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
+ if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
+ channel.send(new BasicCancelBody(channel.version, i->first, true));
}
+ consumers.clear();
}
-
bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
@@ -324,6 +322,7 @@ void BasicMessageChannel::run() {
// Orderly shutdown.
}
catch (const Exception& e) {
+ std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
// FIXME aconway 2007-02-20: Report exception to user.
QPID_LOG(error, e.what());
}