diff options
author | Gordon Sim <gsim@apache.org> | 2012-11-28 14:13:11 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2012-11-28 14:13:11 +0000 |
commit | d74aa0673954287cc39523e6da07bc43c39d95e4 (patch) | |
tree | a418942330a3285ffb0a7d257dcc3ac94e27b911 | |
parent | b52e9f5e55b16da425a7cb5de1e6dad30db78557 (diff) | |
download | qpid-python-d74aa0673954287cc39523e6da07bc43c39d95e4.tar.gz |
QPID-4451: wait for outgoing messages to settle when closing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414705 13f79535-47bb-0310-9956-ffa450edef68
5 files changed, 25 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 67c7f29448..8ad63e325f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -149,7 +149,14 @@ void ConnectionContext::close() qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); if (state != CONNECTED) return; if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) { - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){ + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //wait for outstanding sends to settle + while (!i->second->settled()) { + QPID_LOG(debug, "Waiting for sends to settle before closing"); + wait();//wait until message has been confirmed + } + + if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) { pn_session_close(i->second->session); } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 95398fea6f..e15c645e2c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -350,4 +350,10 @@ void SenderContext::configure(pn_terminus_t* target) const helper.setNodeProperties(target); } } + +bool SenderContext::settled() +{ + return processUnsettled() == 0; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 0202d6aa4b..366a3f1e79 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -69,6 +69,7 @@ class SenderContext const std::string& getTarget() const; Delivery* send(const qpid::messaging::Message& message); void configure() const; + bool settled(); private: friend class ConnectionContext; typedef std::deque<Delivery> Deliveries; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 8b3feb129a..9bdc658bc7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -144,4 +144,13 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c } } +bool SessionContext::settled() +{ + bool result = true; + for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + if (!i->second->settled()) result = false; + } + return result; +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index fbc8731230..eca30a0e97 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -59,6 +59,7 @@ class SessionContext boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout); uint32_t getReceivable(); uint32_t getUnsettledAcks(); + bool settled(); private: friend class ConnectionContext; typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap; |