summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp43
1 files changed, 35 insertions, 8 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 969ad93da9..33a3e226ff 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -24,6 +24,8 @@
#include "qpid/client/amqp0_10/SenderImpl.h"
#include "qpid/client/amqp0_10/MessageSource.h"
#include "qpid/client/amqp0_10/MessageSink.h"
+#include "qpid/client/SessionBase_0_10Access.h"
+#include "qpid/client/SessionImpl.h"
#include "qpid/messaging/PrivateImplRef.h"
#include "qpid/Exception.h"
#include "qpid/log/Statement.h"
@@ -34,12 +36,15 @@
#include "qpid/messaging/Sender.h"
#include "qpid/messaging/Receiver.h"
#include "qpid/messaging/Session.h"
-#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
using qpid::messaging::KeyError;
+using qpid::messaging::NoMessageAvailable;
+using qpid::messaging::MessagingException;
+using qpid::messaging::TransactionAborted;
+using qpid::messaging::SessionError;
using qpid::messaging::MessageImplAccess;
using qpid::messaging::Sender;
using qpid::messaging::Receiver;
@@ -50,6 +55,11 @@ namespace amqp0_10 {
SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {}
+void SessionImpl::checkError()
+{
+ qpid::client::SessionBase_0_10Access s(session);
+ s.get()->assertOpen();
+}
void SessionImpl::sync(bool block)
{
@@ -60,7 +70,7 @@ void SessionImpl::sync(bool block)
void SessionImpl::commit()
{
if (!execute<Commit>()) {
- throw Exception();//TODO: what type?
+ throw TransactionAborted("Transaction aborted due to transport failure");
}
}
@@ -141,6 +151,7 @@ void SessionImpl::setSession(qpid::client::Session s)
for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
}
+ session.sync();
}
struct SessionImpl::CreateReceiver : Command
@@ -219,7 +230,7 @@ SessionImpl& SessionImpl::convert(qpid::messaging::Session& s)
{
boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s);
if (!impl) {
- throw qpid::Exception(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
+ throw SessionError(QPID_MSG("Configuration error; require qpid::client::amqp0_10::SessionImpl"));
}
return *impl;
}
@@ -297,7 +308,7 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
if (incoming.getNextDestination(destination, adjust(timeout))) {
Receivers::const_iterator i = receivers.find(destination);
if (i == receivers.end()) {
- throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
+ throw qpid::messaging::ReceiverError(QPID_MSG("Received message for unknown destination " << destination));
} else {
receiver = i->second;
}
@@ -307,6 +318,17 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
}
} catch (TransportFailure&) {
reconnect();
+ } catch (const qpid::framing::ResourceLimitExceededException& e) {
+ if (backoff()) return false;
+ else throw qpid::messaging::TargetCapacityExceeded(e.what());
+ } catch (const qpid::framing::UnauthorizedAccessException& e) {
+ throw qpid::messaging::UnauthorizedAccess(e.what());
+ } catch (const qpid::SessionException& e) {
+ throw qpid::messaging::SessionError(e.what());
+ } catch (const qpid::ConnectionException& e) {
+ throw qpid::messaging::ConnectionError(e.what());
+ } catch (const qpid::ChannelException& e) {
+ throw qpid::messaging::MessagingException(e.what());
}
}
}
@@ -314,8 +336,8 @@ bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::messag
qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::messaging::Duration timeout)
{
qpid::messaging::Receiver receiver;
- if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
- if (!receiver) throw qpid::Exception("Bad receiver returned!");
+ if (!nextReceiver(receiver, timeout)) throw NoMessageAvailable();
+ if (!receiver) throw SessionError("Bad receiver returned!");
return receiver;
}
@@ -377,7 +399,7 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination)
void SessionImpl::syncImpl(bool block)
{
if (block) session.sync();
- else session.sendSyncRequest();
+ else session.flush();
}
void SessionImpl::commitImpl()
@@ -435,7 +457,12 @@ void SessionImpl::senderCancelled(const std::string& name)
void SessionImpl::reconnect()
{
- connection->connect();
+ connection->open();
+}
+
+bool SessionImpl::backoff()
+{
+ return connection->backoff();
}
qpid::messaging::Connection SessionImpl::getConnection() const