summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/ConnectionContext.cpp')
-rw-r--r--cpp/src/qpid/messaging/amqp/ConnectionContext.cpp66
1 files changed, 60 insertions, 6 deletions
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 9febe66f7e..9036031931 100644
--- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
{
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
if (!lnk->capacity) {
pn_link_flow(lnk->receiver, 1);
wakeupDriver();
@@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar
wakeupDriver();
while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) {
QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver));
- wait();
+ wait(ssn, lnk);
}
if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) {
pn_link_flow(lnk->receiver, lnk->capacity);
@@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
qpid::sys::AbsTime until(convert(timeout));
while (true) {
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn, lnk);
pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver);
QPID_LOG(debug, "In ConnectionContext::get(), current=" << current);
if (current) {
@@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
pn_link_advance(lnk->receiver);
return true;
} else if (until > qpid::sys::now()) {
- wait();
+ wait(ssn, lnk);
} else {
return false;
}
@@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared
void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
if (message) {
ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative);
} else {
@@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int c
}
}
-void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
+void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ checkClosed(ssn);
SenderContext::Delivery* delivery(0);
while (!(delivery = snd->send(message))) {
QPID_LOG(debug, "Waiting for capacity...");
- wait();//wait for capacity
+ wait(ssn, snd);//wait for capacity
}
wakeupDriver();
if (sync) {
while (!delivery->accepted()) {
QPID_LOG(debug, "Waiting for confirmation...");
- wait();//wait until message has been confirmed
+ wait(ssn, snd);//wait until message has been confirmed
}
}
}
@@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver()
}
}
+namespace {
+pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
+pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
+}
+
void ConnectionContext::wait()
{
lock.wait();
if (state == DISCONNECTED) {
throw qpid::messaging::TransportFailure("Disconnected");
}
- //check for any closed links, sessions or indeed the connection
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_connection_close(connection);
+ throw qpid::messaging::ConnectionError("Connection closed by peer");
+ }
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn)
+{
+ wait();
+ checkClosed(ssn);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ wait();
+ checkClosed(ssn, lnk);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
+{
+ if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_session_close(ssn->session);
+ throw qpid::messaging::SessionError("Session ended by peer");
+ } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::SessionError("Session has ended");
+ }
}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
+{
+ checkClosed(ssn, lnk->receiver);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
+{
+ checkClosed(ssn, lnk->sender);
+}
+void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk)
+{
+ checkClosed(ssn);
+ if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_link_close(lnk);
+ throw qpid::messaging::LinkError("Link detached by peer");
+ } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
+ throw qpid::messaging::LinkError("Link is not attached");
+ }
+}
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);