diff options
Diffstat (limited to 'cpp/src/qpid/messaging/amqp/ConnectionContext.cpp')
| -rw-r--r-- | cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 66 |
1 files changed, 60 insertions, 6 deletions
diff --git a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9febe66f7e..9036031931 100644 --- a/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -193,6 +193,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar { { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn, lnk); if (!lnk->capacity) { pn_link_flow(lnk->receiver, 1); wakeupDriver(); @@ -212,7 +213,7 @@ bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shar wakeupDriver(); while (pn_link_credit(lnk->receiver) && !pn_link_queued(lnk->receiver)) { QPID_LOG(debug, "Waiting for message or for credit to be drained: credit=" << pn_link_credit(lnk->receiver) << ", queued=" << pn_link_queued(lnk->receiver)); - wait(); + wait(ssn, lnk); } if (lnk->capacity && pn_link_queued(lnk->receiver) == 0) { pn_link_flow(lnk->receiver, lnk->capacity); @@ -247,6 +248,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared qpid::sys::AbsTime until(convert(timeout)); while (true) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn, lnk); pn_delivery_t* current = pn_link_current((pn_link_t*) lnk->receiver); QPID_LOG(debug, "In ConnectionContext::get(), current=" << current); if (current) { @@ -262,7 +264,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared pn_link_advance(lnk->receiver); return true; } else if (until > qpid::sys::now()) { - wait(); + wait(ssn, lnk); } else { return false; } @@ -273,6 +275,7 @@ bool ConnectionContext::get(boost::shared_ptr<SessionContext> ssn, boost::shared void ConnectionContext::acknowledge(boost::shared_ptr<SessionContext> ssn, qpid::messaging::Message* message, bool cumulative) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); if (message) { ssn->acknowledge(MessageImplAccess::get(*message).getInternalId(), cumulative); } else { @@ -329,19 +332,20 @@ void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int c } } -void ConnectionContext::send(boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) +void ConnectionContext::send(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> snd, const qpid::messaging::Message& message, bool sync) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + checkClosed(ssn); SenderContext::Delivery* delivery(0); while (!(delivery = snd->send(message))) { QPID_LOG(debug, "Waiting for capacity..."); - wait();//wait for capacity + wait(ssn, snd);//wait for capacity } wakeupDriver(); if (sync) { while (!delivery->accepted()) { QPID_LOG(debug, "Waiting for confirmation..."); - wait();//wait until message has been confirmed + wait(ssn, snd);//wait until message has been confirmed } } } @@ -408,15 +412,65 @@ void ConnectionContext::wakeupDriver() } } +namespace { +pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; +pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; +} + void ConnectionContext::wait() { lock.wait(); if (state == DISCONNECTED) { throw qpid::messaging::TransportFailure("Disconnected"); } - //check for any closed links, sessions or indeed the connection + if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_connection_close(connection); + throw qpid::messaging::ConnectionError("Connection closed by peer"); + } +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn) +{ + wait(); + checkClosed(ssn); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::wait(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + wait(); + checkClosed(ssn, lnk); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) +{ + if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_session_close(ssn->session); + throw qpid::messaging::SessionError("Session ended by peer"); + } else if ((pn_session_state(ssn->session) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::SessionError("Session has ended"); + } } +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) +{ + checkClosed(ssn, lnk->receiver); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) +{ + checkClosed(ssn, lnk->sender); +} +void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_link_t* lnk) +{ + checkClosed(ssn); + if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_link_close(lnk); + throw qpid::messaging::LinkError("Link detached by peer"); + } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { + throw qpid::messaging::LinkError("Link is not attached"); + } +} boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); |
