summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2012-11-28 14:13:11 +0000
committerGordon Sim <gsim@apache.org>2012-11-28 14:13:11 +0000
commitd74aa0673954287cc39523e6da07bc43c39d95e4 (patch)
treea418942330a3285ffb0a7d257dcc3ac94e27b911
parentb52e9f5e55b16da425a7cb5de1e6dad30db78557 (diff)
downloadqpid-python-d74aa0673954287cc39523e6da07bc43c39d95e4.tar.gz
QPID-4451: wait for outgoing messages to settle when closing
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.20@1414705 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp6
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp9
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h1
5 files changed, 25 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 67c7f29448..8ad63e325f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -149,7 +149,14 @@ void ConnectionContext::close()
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
if (state != CONNECTED) return;
if (!(pn_connection_state(connection) & PN_LOCAL_CLOSED)) {
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i){
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ //wait for outstanding sends to settle
+ while (!i->second->settled()) {
+ QPID_LOG(debug, "Waiting for sends to settle before closing");
+ wait();//wait until message has been confirmed
+ }
+
+
if (!(pn_session_state(i->second->session) & PN_LOCAL_CLOSED)) {
pn_session_close(i->second->session);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 95398fea6f..e15c645e2c 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -350,4 +350,10 @@ void SenderContext::configure(pn_terminus_t* target) const
helper.setNodeProperties(target);
}
}
+
+bool SenderContext::settled()
+{
+ return processUnsettled() == 0;
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 0202d6aa4b..366a3f1e79 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -69,6 +69,7 @@ class SenderContext
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
void configure() const;
+ bool settled();
private:
friend class ConnectionContext;
typedef std::deque<Delivery> Deliveries;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 8b3feb129a..9bdc658bc7 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -144,4 +144,13 @@ void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool c
}
}
+bool SessionContext::settled()
+{
+ bool result = true;
+ for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ if (!i->second->settled()) result = false;
+ }
+ return result;
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index fbc8731230..eca30a0e97 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -59,6 +59,7 @@ class SessionContext
boost::shared_ptr<ReceiverContext> nextReceiver(qpid::messaging::Duration timeout);
uint32_t getReceivable();
uint32_t getUnsettledAcks();
+ bool settled();
private:
friend class ConnectionContext;
typedef std::map<std::string, boost::shared_ptr<SenderContext> > SenderMap;