diff options
author | Gordon Sim <gsim@apache.org> | 2007-07-05 09:47:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-07-05 09:47:07 +0000 |
commit | 07c8c499649c725a226eeda3e0bfe58fa8ba984c (patch) | |
tree | 0f71fc80b9e6e9929184334f4dc7d8fc03f7ccc0 /cpp/src/qpid/client/BasicMessageChannel.cpp | |
parent | d4be469092c558ca9031d82b963b8b845fa1e1bd (diff) | |
download | qpid-python-07c8c499649c725a226eeda3e0bfe58fa8ba984c.tar.gz |
Fix for QPID-534. Get now detects closure correctly. Also fixed broker to allow channel.close-ok (and fixed client to send it).
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@553441 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/BasicMessageChannel.cpp')
-rw-r--r-- | cpp/src/qpid/client/BasicMessageChannel.cpp | 23 |
1 files changed, 11 insertions, 12 deletions
diff --git a/cpp/src/qpid/client/BasicMessageChannel.cpp b/cpp/src/qpid/client/BasicMessageChannel.cpp index 91849c735e..60368268c0 100644 --- a/cpp/src/qpid/client/BasicMessageChannel.cpp +++ b/cpp/src/qpid/client/BasicMessageChannel.cpp @@ -100,34 +100,32 @@ void BasicMessageChannel::cancel(const std::string& tag, bool synch) { c = i->second; consumers.erase(i); } - if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) + if(c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); + } channel.sendAndReceiveSync<BasicCancelOkBody>( synch, make_shared_ptr(new BasicCancelBody(channel.version, tag, !synch))); } void BasicMessageChannel::close(){ - ConsumerMap consumersCopy; - { - Mutex::ScopedLock l(lock); - consumersCopy = consumers; - consumers.clear(); - } destGet.shutdown(); destDispatch.shutdown(); - for (ConsumerMap::iterator i=consumersCopy.begin(); - i != consumersCopy.end(); ++i) +} + +void BasicMessageChannel::cancelAll(){ + Mutex::ScopedLock l(lock); + for (ConsumerMap::iterator i = consumers.begin(); i != consumers.end(); i++) { Consumer& c = i->second; - if ((c.ackMode == LAZY_ACK || c.ackMode == AUTO_ACK) - && c.lastDeliveryTag > 0) + if (c.ackMode == LAZY_ACK && c.lastDeliveryTag > 0) { channel.send(new BasicAckBody(channel.version, c.lastDeliveryTag, true)); } + channel.send(new BasicCancelBody(channel.version, i->first, true)); } + consumers.clear(); } - bool BasicMessageChannel::get( Message& msg, const Queue& queue, AckMode ackMode) { @@ -324,6 +322,7 @@ void BasicMessageChannel::run() { // Orderly shutdown. } catch (const Exception& e) { + std::cout << "Error caught by dispatch thread: " << e.what() << std::endl; // FIXME aconway 2007-02-20: Report exception to user. QPID_LOG(error, e.what()); } |