summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/ResilientConnection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/ResilientConnection.cpp')
-rw-r--r--qpid/cpp/src/qmf/ResilientConnection.cpp76
1 files changed, 42 insertions, 34 deletions
diff --git a/qpid/cpp/src/qmf/ResilientConnection.cpp b/qpid/cpp/src/qmf/ResilientConnection.cpp
index 610306f896..7ec03cf4da 100644
--- a/qpid/cpp/src/qmf/ResilientConnection.cpp
+++ b/qpid/cpp/src/qmf/ResilientConnection.cpp
@@ -19,6 +19,8 @@
#include "qmf/ResilientConnection.h"
#include "qmf/MessageImpl.h"
+#include "qmf/ConnectionSettingsImpl.h"
+#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
@@ -39,7 +41,7 @@
using namespace std;
using namespace qmf;
-using namespace qpid::client;
+using namespace qpid;
using qpid::sys::Mutex;
namespace qmf {
@@ -55,30 +57,29 @@ namespace qmf {
ResilientConnectionEvent copy();
};
- struct RCSession : public MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
+ struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
typedef boost::intrusive_ptr<RCSession> Ptr;
ResilientConnectionImpl& connImpl;
string name;
- Connection& connection;
- Session session;
- SubscriptionManager* subscriptions;
+ client::Connection& connection;
+ client::Session session;
+ client::SubscriptionManager* subscriptions;
void* userContext;
vector<string> dests;
qpid::sys::Thread thread;
- RCSession(ResilientConnectionImpl& ci, const string& n, Connection& c, void* uc) :
+ RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
- subscriptions(new SubscriptionManager(session)), userContext(uc), thread(*this) {}
+ subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this) {}
~RCSession();
- void received(qpid::client::Message& msg);
+ void received(client::Message& msg);
void run();
void stop();
};
class ResilientConnectionImpl : public qpid::sys::Runnable {
public:
- ResilientConnectionImpl(ConnectionSettings& settings,
- int dmin, int dmax, int dfactor);
+ ResilientConnectionImpl(const ConnectionSettings& settings);
~ResilientConnectionImpl();
bool isConnected() const;
@@ -107,8 +108,8 @@ namespace qmf {
bool connected;
bool shutdown;
string lastError;
- ConnectionSettings settings;
- Connection connection;
+ const ConnectionSettings settings;
+ client::Connection connection;
mutable qpid::sys::Mutex lock;
int delayMin;
int delayMax;
@@ -155,7 +156,7 @@ void RCSession::stop()
subscriptions->stop();
}
-void RCSession::received(qpid::client::Message& msg)
+void RCSession::received(client::Message& msg)
{
qmf::MessageImpl qmsg;
qmsg.body = msg.getData();
@@ -174,12 +175,11 @@ void RCSession::received(qpid::client::Message& msg)
connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
}
-ResilientConnectionImpl::ResilientConnectionImpl(ConnectionSettings& _settings,
- int dmin, int dmax, int dfactor) :
- notifyFd(-1), connected(false), shutdown(false), settings(_settings),
- delayMin(dmin), delayMax(dmax), delayFactor(dfactor), connThread(*this)
+ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
+ notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
{
connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
+ settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
}
ResilientConnectionImpl::~ResilientConnectionImpl()
@@ -222,7 +222,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte
RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
- handle.handle = (void*) sess.get();
+ handle.impl = (void*) sess.get();
sessions.insert(sess);
return true;
@@ -231,7 +231,7 @@ bool ResilientConnectionImpl::createSession(const char* name, void* sessionConte
void ResilientConnectionImpl::destroySession(SessionHandle handle)
{
Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
if (iter != sessions.end()) {
for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
@@ -247,7 +247,7 @@ void ResilientConnectionImpl::destroySession(SessionHandle handle)
void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& message)
{
Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.handle);
+ RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
set<RCSession::Ptr>::iterator iter = sessions.find(sess);
qpid::client::Message msg;
string data(message.body, message.length);
@@ -256,7 +256,7 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me
msg.setData(data);
try {
- sess->session.messageTransfer(arg::content=msg, arg::destination=message.destination);
+ sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
} catch(exception& e) {
QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
sessions.erase(iter);
@@ -267,19 +267,22 @@ void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::Message& me
void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
- sess->session.queueDeclare(arg::queue=queue, arg::autoDelete=true, arg::exclusive=true);
+ sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
+ sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
+ sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
sess->subscriptions->subscribe(*sess, queue, queue);
+ sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
sess->dests.push_back(string(queue));
}
void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
- sess->session.queueDelete(arg::queue=queue);
+ sess->session.queueDelete(client::arg::queue=queue);
for (vector<string>::iterator iter = sess->dests.begin();
iter != sess->dests.end(); iter++)
if (*iter == queue) {
@@ -293,18 +296,18 @@ void ResilientConnectionImpl::bind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
- sess->session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
+ sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
}
void ResilientConnectionImpl::unbind(SessionHandle handle,
char* exchange, char* queue, char* key)
{
Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.handle;
+ RCSession* sess = (RCSession*) handle.impl;
- sess->session.exchangeUnbind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=key);
+ sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
}
void ResilientConnectionImpl::setNotifyFd(int fd)
@@ -318,7 +321,8 @@ void ResilientConnectionImpl::run()
while (true) {
try {
- connection.open(settings);
+ QPID_LOG(trace, "Trying to open connection...");
+ connection.open(settings.impl->getClientSettings());
{
Mutex::ScopedLock _lock(lock);
connected = true;
@@ -326,6 +330,7 @@ void ResilientConnectionImpl::run()
while (connected)
cond.wait(lock);
+ delay = delayMin;
while (!sessions.empty()) {
set<RCSession::Ptr>::iterator iter = sessions.begin();
@@ -334,6 +339,11 @@ void ResilientConnectionImpl::run()
EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
Mutex::ScopedUnlock _u(lock);
sess->stop();
+
+ // Nullify the intrusive pointer within the scoped unlock, otherwise,
+ // the reference is held until overwritted above (under lock) which causes
+ // the session destructor to be called with the lock held.
+ sess = 0;
}
EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
@@ -341,7 +351,6 @@ void ResilientConnectionImpl::run()
if (shutdown)
return;
}
- delay = delayMin;
connection.close();
} catch (exception &e) {
QPID_LOG(debug, "connection.open exception: " << e.what());
@@ -396,10 +405,9 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
// Wrappers
//==================================================================
-ResilientConnection::ResilientConnection(ConnectionSettings& settings,
- int delayMin, int delayMax, int delayFactor)
+ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
{
- impl = new ResilientConnectionImpl(settings, delayMin, delayMax, delayFactor);
+ impl = new ResilientConnectionImpl(settings);
}
ResilientConnection::~ResilientConnection()