summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:04:47 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:04:47 +0000
commitc6621dcb50cb819ee370311ff252134bbcf96c60 (patch)
tree9028cd5fc9c74a12661d116fa3a322368c2913e6
parentfa5145ec6ac34efdc3009b4a09e945aa446c16fb (diff)
downloadqpid-python-c6621dcb50cb819ee370311ff252134bbcf96c60.tar.gz
Serialise close into the data callbacks:
Rejig Rdma::ConnectionManager to have a stop function with a callback and use this to ensure that the Rdma::Connector used by qpid::sys::RdmaConnector is correctly deleted only after it has been actually stopped git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021819 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp84
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp15
-rw-r--r--qpid/cpp/src/qpid/sys/rdma/RdmaIO.h11
3 files changed, 72 insertions, 38 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 208d42f672..026952bd99 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -73,7 +73,7 @@ class RdmaConnector : public Connector, public sys::Codec
framing::OutputHandler* output;
Rdma::AsynchIO* aio;
- std::auto_ptr<Rdma::Connector> acon;
+ Rdma::Connector* acon;
sys::Poller::shared_ptr poller;
std::auto_ptr<qpid::sys::SecurityLayer> securityLayer;
@@ -82,7 +82,6 @@ class RdmaConnector : public Connector, public sys::Codec
// Callbacks
void connected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
void connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType);
- void disconnectAction();
void disconnected();
void rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams&);
@@ -91,7 +90,8 @@ class RdmaConnector : public Connector, public sys::Codec
void writeDataBlock(const framing::AMQDataBlock& data);
void dataError(Rdma::AsynchIO&);
void drained();
- void stopped(Rdma::AsynchIO* aio=0);
+ void connectionStopped(Rdma::Connector* acon);
+ void dataStopped(Rdma::AsynchIO* aio);
std::string identifier;
@@ -147,6 +147,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
polling(false),
shutdownHandler(0),
aio(0),
+ acon(0),
poller(p)
{
QPID_LOG(debug, "RdmaConnector created for " << version);
@@ -156,14 +157,20 @@ namespace {
void deleteAsynchIO(Rdma::AsynchIO& aio) {
delete &aio;
}
+
+ void deleteConnector(Rdma::ConnectionManager& con) {
+ delete &con;
+ }
}
RdmaConnector::~RdmaConnector() {
QPID_LOG(debug, "~RdmaConnector " << identifier);
- close();
if (aio) {
aio->stop(deleteAsynchIO);
}
+ if (acon) {
+ acon->stop(deleteConnector);
+ }
if (shutdownHandler) {
shutdownHandler->shutdown();
}
@@ -173,12 +180,12 @@ void RdmaConnector::connect(const std::string& host, int port){
Mutex::ScopedLock l(pollingLock);
assert(!polling);
- acon.reset(new Rdma::Connector(
+ acon = new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
boost::bind(&RdmaConnector::connected, this, poller, _1, _2),
boost::bind(&RdmaConnector::connectionError, this, poller, _1, _2),
boost::bind(&RdmaConnector::disconnected, this),
- boost::bind(&RdmaConnector::rejected, this, poller, _1, _2)));
+ boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
polling = true;
@@ -211,11 +218,10 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
}
{
Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
+ assert(polling);
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
@@ -226,10 +232,10 @@ void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::i
if (!polling) return;
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
-void RdmaConnector::disconnectAction() {
+void RdmaConnector::disconnected() {
QPID_LOG(debug, "Connection disconnected " << identifier);
{
Mutex::ScopedLock l(pollingLock);
@@ -237,11 +243,8 @@ void RdmaConnector::disconnectAction() {
if (!polling) return;
polling = false;
}
- drained();
-}
-
-void RdmaConnector::disconnected() {
- aio->requestCallback(boost::bind(&RdmaConnector::disconnectAction, this));
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
@@ -252,7 +255,7 @@ void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusiv
if (!polling) return;
polling = false;
}
- stopped();
+ connectionStopped(acon);
}
void RdmaConnector::dataError(Rdma::AsynchIO&) {
@@ -266,35 +269,48 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) {
drained();
}
-void RdmaConnector::stopped(Rdma::AsynchIO* a) {
- QPID_LOG(debug, "RdmaConnector::stopped " << identifier);
- assert(!polling);
- aio = 0;
- delete a;
- if (shutdownHandler) {
- ShutdownHandler* s = shutdownHandler;
- shutdownHandler = 0;
- s->shutdown();
+void RdmaConnector::close() {
+ QPID_LOG(debug, "RdmaConnector::close " << identifier);
+ {
+ Mutex::ScopedLock l(pollingLock);
+ if (!polling) return;
+ polling = false;
}
+ if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::drained() {
QPID_LOG(debug, "RdmaConnector::drained " << identifier);
assert(!polling);
- acon->stop();
if (aio) {
Rdma::AsynchIO* a = aio;
aio = 0;
- a->stop(boost::bind(&RdmaConnector::stopped, this, a));
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
}
}
-void RdmaConnector::close() {
- QPID_LOG(debug, "RdmaConnector::close " << identifier);
- Mutex::ScopedLock l(pollingLock);
- if (!polling) return;
- polling = false;
- if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
+ QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
+ assert(!polling);
+ aio = 0;
+ delete a;
+ if (acon) {
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c));
+ }
+}
+
+void RdmaConnector::connectionStopped(Rdma::Connector* c) {
+ QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
+ assert(!polling);
+ acon = 0;
+ delete c;
+ if (shutdownHandler) {
+ ShutdownHandler* s = shutdownHandler;
+ shutdownHandler = 0;
+ s->shutdown();
+ }
}
void RdmaConnector::setInputHandler(InputHandler* handler){
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
index fe7062d3ea..23660a0b9f 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
@@ -518,6 +518,7 @@ namespace Rdma {
ErrorCallback errc,
DisconnectedCallback dc
) :
+ state(IDLE),
ci(Connection::make()),
handle(*ci, boost::bind(&ConnectionManager::event, this, _1), 0, 0),
errorCallback(errc),
@@ -537,11 +538,23 @@ namespace Rdma {
handle.startWatch(poller);
}
- void ConnectionManager::stop() {
+ void ConnectionManager::doStoppedCallback() {
+ // Ensure we can't get any more callbacks (except for the stopped callback)
handle.stopWatch();
+
+ NotifyCallback nc;
+ nc.swap(notifyCallback);
+ nc(*this);
+ }
+
+ void ConnectionManager::stop(NotifyCallback nc) {
+ state = STOPPED;
+ notifyCallback = nc;
+ handle.call(boost::bind(&ConnectionManager::doStoppedCallback, this));
}
void ConnectionManager::event(DispatchHandle&) {
+ if (state.get() == STOPPED) return;
connectionEvent(ci);
}
diff --git a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
index 55174ea8a1..00eba28716 100644
--- a/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
+++ b/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
@@ -26,7 +26,6 @@
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/DispatchHandle.h"
-#include "qpid/sys/Mutex.h"
#include "qpid/sys/SocketAddress.h"
#include <netinet/in.h>
@@ -163,14 +162,19 @@ namespace Rdma {
typedef boost::function1<void, Rdma::Connection::intrusive_ptr> DisconnectedCallback;
class ConnectionManager {
+ typedef boost::function1<void, ConnectionManager&> NotifyCallback;
+
+ enum State {IDLE, STOPPED};
+ qpid::sys::AtomicValue<State> state;
Connection::intrusive_ptr ci;
qpid::sys::DispatchHandleRef handle;
+ NotifyCallback notifyCallback;
protected:
ErrorCallback errorCallback;
DisconnectedCallback disconnectedCallback;
- public:
+ public:
ConnectionManager(
ErrorCallback errc,
DisconnectedCallback dc
@@ -179,10 +183,11 @@ namespace Rdma {
virtual ~ConnectionManager();
void start(qpid::sys::Poller::shared_ptr poller, const qpid::sys::SocketAddress& addr);
- void stop();
+ void stop(NotifyCallback);
private:
void event(qpid::sys::DispatchHandle& handle);
+ void doStoppedCallback();
virtual void startConnection(Connection::intrusive_ptr ci, const qpid::sys::SocketAddress& addr) = 0;
virtual void connectionEvent(Connection::intrusive_ptr ci) = 0;