diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 43 |
1 files changed, 35 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 969ad93da9..33a3e226ff 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -24,6 +24,8 @@ #include "qpid/client/amqp0_10/SenderImpl.h" #include "qpid/client/amqp0_10/MessageSource.h" #include "qpid/client/amqp0_10/MessageSink.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/SessionImpl.h" #include "qpid/messaging/PrivateImplRef.h" #include "qpid/Exception.h" #include "qpid/log/Statement.h" @@ -34,12 +36,15 @@ #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" -#include "qpid/framing/reply_exceptions.h" #include <boost/format.hpp> #include <boost/function.hpp> #include <boost/intrusive_ptr.hpp> using qpid::messaging::KeyError; +using qpid::messaging::NoMessageAvailable; +using qpid::messaging::MessagingException; +using qpid::messaging::TransactionAborted; +using qpid::messaging::SessionError; using qpid::messaging::MessageImplAccess; using qpid::messaging::Sender; using qpid::messaging::Receiver; @@ -50,6 +55,11 @@ namespace amqp0_10 { SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} +void SessionImpl::checkError() +{ + qpid::client::SessionBase_0_10Access s(session); + s.get()->assertOpen(); +} void SessionImpl::sync(bool block) { @@ -60,7 +70,7 @@ void SessionImpl::sync(bool block) void SessionImpl::commit() { if (!execute<Commit>()) { - throw Exception();//TODO: what type? + throw TransactionAborted("Transaction aborted due to transport failure"); } } @@ -141,6 +151,7 @@ void SessionImpl::setSession(qpid::client::Session s) for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); } + session.sync(); } struct SessionImpl::CreateReceiver : Command @@ -219,7 +230,7 @@ SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); if (!impl) { - throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); + throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl")); } return *impl; } @@ -297,7 +308,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag if (incoming.getNextDestination(destination, adjust(timeout))) { Receivers::const_iterator i = receivers.find(destination); if (i == receivers.end()) { - throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination)); + throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination)); } else { receiver = i->second; } @@ -307,6 +318,17 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag } } catch (TransportFailure&) { reconnect(); + } catch (const qpid::framing::ResourceLimitExceededException& e) { + if (backoff()) return false; + else throw qpid::messaging::TargetCapacityExceeded(e.what()); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + throw qpid::messaging::UnauthorizedAccess(e.what()); + } catch (const qpid::SessionException& e) { + throw qpid::messaging::SessionError(e.what()); + } catch (const qpid::ConnectionException& e) { + throw qpid::messaging::ConnectionError(e.what()); + } catch (const qpid::ChannelException& e) { + throw qpid::messaging::MessagingException(e.what()); } } } @@ -314,8 +336,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout) { qpid::messaging::Receiver receiver; - if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable(); - if (!receiver) throw qpid::Exception("Bad receiver returned!"); + if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable(); + if (!receiver) throw SessionError("Bad receiver returned!"); return receiver; } @@ -377,7 +399,7 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination) void SessionImpl::syncImpl(bool block) { if (block) session.sync(); - else session.sendSyncRequest(); + else session.flush(); } void SessionImpl::commitImpl() @@ -435,7 +457,12 @@ void SessionImpl::senderCancelled(const std::string& name) void SessionImpl::reconnect() { - connection->connect(); + connection->open(); +} + +bool SessionImpl::backoff() +{ + return connection->backoff(); } qpid::messaging::Connection SessionImpl::getConnection() const |