summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/BasicMessageChannel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/BasicMessageChannel.cpp')
-rw-r--r--cpp/src/qpid/client/BasicMessageChannel.cpp23
1 files changed, 11 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp
index 91849c735e..60368268c0 100644
--- a/cpp/src/qpid/client/BasicMessageChannel.cpp
+++ b/cpp/src/qpid/client/BasicMessageChannel.cpp
@@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) {
c = i->second;
consumers.erase(i);
}
- if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
+ if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) {
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
+ }
channel.sendAndReceiveSync<BasicCancelOkBody>(
synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch)));
}
void BasicMessageChannel::close(){
- ConsumerMap consumersCopy;
- {
- Mutex::ScopedLock l(lock);
- consumersCopy = consumers;
- consumers.clear();
- }
destGet.shutdown();
destDispatch.shutdown();
- for (ConsumerMap::iterator i=consumersCopy.begin();
- i != consumersCopy.end(); ++i)
+}
+
+void BasicMessageChannel::cancelAll(){
+ Mutex::ScopedLock l(lock);
+ for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++)
{
Consumer& c = i->second;
- if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK)
- && c.lastDeliveryTag > 0)
+ if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0)
{
channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true));
}
+ channel.send(new BasicCancelBody(channel.version, i->first, true));
}
+ consumers.clear();
}
-
bool BasicMessageChannel::get(
Message& msg, const Queue& queue, AckMode ackMode)
{
@@ -324,6 +322,7 @@ void BasicMessageChannel::run() {
// Orderly shutdown.
}
catch (const Exception& e) {
+ std::cout << "Error caught by dispatch thread: " << e.what() << std::endl;
// FIXME aconway 2007-02-20: Report exception to user.
QPID_LOG(error, e.what());
}