diff options
Diffstat (limited to 'qpid/cpp/src/qmf/ResilientConnection.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/ResilientConnection.cpp | 76 |
1 files changed, 42 insertions, 34 deletions
diff --git a/qpid/cpp/src/qmf/ResilientConnection.cpp b/qpid/cpp/src/qmf/ResilientConnection.cpp index 610306f896..7ec03cf4da 100644 --- a/qpid/cpp/src/qmf/ResilientConnection.cpp +++ b/qpid/cpp/src/qmf/ResilientConnection.cpp @@ -19,6 +19,8 @@ #include "qmf/ResilientConnection.h" #include "qmf/MessageImpl.h" +#include "qmf/ConnectionSettingsImpl.h" +#include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <qpid/client/SubscriptionManager.h> @@ -39,7 +41,7 @@ using namespace std; using namespace qmf; -using namespace qpid::client; +using namespace qpid; using qpid::sys::Mutex; namespace qmf { @@ -55,30 +57,29 @@ namespace qmf { ResilientConnectionEvent copy(); }; - struct RCSession : public MessageListener, public qpid::sys::Runnable, public qpid::RefCounted { + struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted { typedef boost::intrusive_ptr<RCSession> Ptr; ResilientConnectionImpl& connImpl; string name; - Connection& connection; - Session session; - SubscriptionManager* subscriptions; + client::Connection& connection; + client::Session session; + client::SubscriptionManager* subscriptions; void* userContext; vector<string> dests; qpid::sys::Thread thread; - RCSession(ResilientConnectionImpl& ci, const string& n, Connection& c, void* uc) : + RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) : connImpl(ci), name(n), connection(c), session(connection.newSession(name)), - subscriptions(new SubscriptionManager(session)), userContext(uc), thread(*this) {} + subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) {} ~RCSession(); - void received(qpid::client::Message& msg); + void received(client::Message& msg); void run(); void stop(); }; class ResilientConnectionImpl : public qpid::sys::Runnable { public: - ResilientConnectionImpl(ConnectionSettings& settings, - int dmin, int dmax, int dfactor); + ResilientConnectionImpl(const ConnectionSettings& settings); ~ResilientConnectionImpl(); bool isConnected() const; @@ -107,8 +108,8 @@ namespace qmf { bool connected; bool shutdown; string lastError; - ConnectionSettings settings; - Connection connection; + const ConnectionSettings settings; + client::Connection connection; mutable qpid::sys::Mutex lock; int delayMin; int delayMax; @@ -155,7 +156,7 @@ void RCSession::stop() subscriptions->stop(); } -void RCSession::received(qpid::client::Message& msg) +void RCSession::received(client::Message& msg) { qmf::MessageImpl qmsg; qmsg.body = msg.getData(); @@ -174,12 +175,11 @@ void RCSession::received(qpid::client::Message& msg) connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg); } -ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings, - int dmin, int dmax, int dfactor) : - notifyFd(-1), connected(false), shutdown(false), settings(_settings), - delayMin(dmin), delayMax(dmax), delayFactor(dfactor), connThread(*this) +ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) : + notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this) { connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this)); + settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor); } ResilientConnectionImpl::~ResilientConnectionImpl() @@ -222,7 +222,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext)); - handle.handle = (void*) sess.get(); + handle.impl = (void*) sess.get(); sessions.insert(sess); return true; @@ -231,7 +231,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte void ResilientConnectionImpl::destroySession(SessionHandle handle) { Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); set<RCSession::Ptr>::iterator iter = sessions.find(sess); if (iter != sessions.end()) { for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++) @@ -247,7 +247,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle) void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message) { Mutex::ScopedLock _lock(lock); - RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle); + RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl); set<RCSession::Ptr>::iterator iter = sessions.find(sess); qpid::client::Message msg; string data(message.body, message.length); @@ -256,7 +256,7 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me msg.setData(data); try { - sess->session.messageTransfer(arg::content=msg, arg::destination=message.destination); + sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination); } catch(exception& e) { QPID_LOG(error, "Session Exception during message-transfer: " << e.what()); sessions.erase(iter); @@ -267,19 +267,22 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; - sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true); + sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true); + sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE); + sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED); sess->subscriptions->subscribe(*sess, queue, queue); + sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited()); sess->dests.push_back(string(queue)); } void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; - sess->session.queueDelete(arg::queue=queue); + sess->session.queueDelete(client::arg::queue=queue); for (vector<string>::iterator iter = sess->dests.begin(); iter != sess->dests.end(); iter++) if (*iter == queue) { @@ -293,18 +296,18 @@ void ResilientConnectionImpl::bind(SessionHandle handle, char* exchange, char* queue, char* key) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; - sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); + sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); } void ResilientConnectionImpl::unbind(SessionHandle handle, char* exchange, char* queue, char* key) { Mutex::ScopedLock _lock(lock); - RCSession* sess = (RCSession*) handle.handle; + RCSession* sess = (RCSession*) handle.impl; - sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key); + sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key); } void ResilientConnectionImpl::setNotifyFd(int fd) @@ -318,7 +321,8 @@ void ResilientConnectionImpl::run() while (true) { try { - connection.open(settings); + QPID_LOG(trace, "Trying to open connection..."); + connection.open(settings.impl->getClientSettings()); { Mutex::ScopedLock _lock(lock); connected = true; @@ -326,6 +330,7 @@ void ResilientConnectionImpl::run() while (connected) cond.wait(lock); + delay = delayMin; while (!sessions.empty()) { set<RCSession::Ptr>::iterator iter = sessions.begin(); @@ -334,6 +339,11 @@ void ResilientConnectionImpl::run() EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext); Mutex::ScopedUnlock _u(lock); sess->stop(); + + // Nullify the intrusive pointer within the scoped unlock, otherwise, + // the reference is held until overwritted above (under lock) which causes + // the session destructor to be called with the lock held. + sess = 0; } EnqueueEvent(ResilientConnectionEvent::DISCONNECTED); @@ -341,7 +351,6 @@ void ResilientConnectionImpl::run() if (shutdown) return; } - delay = delayMin; connection.close(); } catch (exception &e) { QPID_LOG(debug, "connection.open exception: " << e.what()); @@ -396,10 +405,9 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k // Wrappers //================================================================== -ResilientConnection::ResilientConnection(ConnectionSettings& settings, - int delayMin, int delayMax, int delayFactor) +ResilientConnection::ResilientConnection(const ConnectionSettings& settings) { - impl = new ResilientConnectionImpl(settings, delayMin, delayMax, delayFactor); + impl = new ResilientConnectionImpl(settings); } ResilientConnection::~ResilientConnection() |