diff options
Diffstat (limited to 'cpp/src/qpid/client/BasicMessageChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/BasicMessageChannel.cpp | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp index 91849c735e..60368268c0 100644 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } channel.sendAndReceiveSync<BasicCancelOkBody>( synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } destGet.shutdown(); destDispatch.shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) +} + +void BasicMessageChannel::cancelAll(){ + Mutex::ScopedLock l(lock); + for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++) { Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) + if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } + channel.send(new BasicCancelBody(channel.version, i->first, true)); } + consumers.clear(); } - bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { @@ -324,6 +322,7 @@ void BasicMessageChannel::run() { // Orderly shutdown. } catch (const Exception& e) { + std::cout << "Error caught by dispatch thread: " << e.what() << std::endl; // FIXME aconway 2007-02-20: Report exception to user. QPID_LOG(error, e.what()); } |