summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-09-08 16:49:30 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-09-08 16:49:30 +0000
commit26f29811c8103c1d00b38a9a0d3754165e2770fa (patch)
tree1af343558aa767d40efdf28e89fd1c5dc21c9916 /cpp/src
parent518596d72441fb1ed8a04717e378a9296d8bdc76 (diff)
downloadqpid-python-26f29811c8103c1d00b38a9a0d3754165e2770fa.tar.gz
Improve daemon handling of unexpected RDMA disconnects from client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp69
1 files changed, 48 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 7c2dc77caa..09c9770a5b 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -51,13 +51,16 @@ class RdmaIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
+ sys::Mutex pollingLock;
+ bool polling;
+
void write(const framing::ProtocolInitiation&);
public:
RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f);
~RdmaIOHandler();
void init(Rdma::AsynchIO* a);
- void start(Poller::shared_ptr poller) {aio->start(poller);}
+ void start(Poller::shared_ptr poller);
// Output side
void close();
@@ -74,7 +77,8 @@ class RdmaIOHandler : public OutputControl {
void full(Rdma::AsynchIO& aio);
void idle(Rdma::AsynchIO& aio);
void error(Rdma::AsynchIO& aio);
- void drained(Rdma::AsynchIO& aio);
+ void disconnected();
+ void drained();
};
RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) :
@@ -82,26 +86,29 @@ RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::Conne
identifier(c->getPeerName()),
factory(f),
codec(0),
- readError(false)
+ readError(false),
+ polling(false)
{
}
+RdmaIOHandler::~RdmaIOHandler() {
+ if (codec)
+ codec->closed();
+ delete codec;
+ delete aio;
+}
+
void RdmaIOHandler::init(Rdma::AsynchIO* a) {
aio = a;
}
-namespace {
- void deleteAsynchIO(Rdma::AsynchIO& aio) {
- delete &aio;
- }
-}
+void RdmaIOHandler::start(Poller::shared_ptr poller) {
+ Mutex::ScopedLock l(pollingLock);
+ assert(!polling);
-RdmaIOHandler::~RdmaIOHandler() {
- if (codec)
- codec->closed();
- delete codec;
+ polling = true;
- aio->stop(deleteAsynchIO);
+ aio->start(poller);
}
void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
@@ -115,7 +122,10 @@ void RdmaIOHandler::write(const framing::ProtocolInitiation& data)
}
void RdmaIOHandler::close() {
- aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
+ Mutex::ScopedLock l(pollingLock);
+ if (!polling) return;
+ polling = false;
+ aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this));
}
// TODO: Dummy implementation, need to fill this in for heartbeat timeout to work
@@ -140,7 +150,7 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) {
aio->queueWrite(buff);
}
if (codec->isClosed())
- aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
+ close();
}
void RdmaIOHandler::initProtocolOut() {
@@ -153,10 +163,28 @@ void RdmaIOHandler::initProtocolOut() {
}
void RdmaIOHandler::error(Rdma::AsynchIO&) {
- close();
+ disconnected();
+}
+
+void RdmaIOHandler::disconnected() {
+ {
+ Mutex::ScopedLock l(pollingLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!polling) return;
+ polling = false;
+ }
+ drained();
+}
+
+namespace {
+ void stopped(RdmaIOHandler* async) {
+ delete async;
+ }
}
-void RdmaIOHandler::drained(Rdma::AsynchIO&) {
+void RdmaIOHandler::drained() {
+ assert(!polling);
+ aio->stop(boost::bind(&stopped, this));
}
void RdmaIOHandler::full(Rdma::AsynchIO&) {
@@ -186,7 +214,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) {
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
- aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
+ close();
}
}
@@ -205,7 +233,7 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) {
// send valid version header & close connection.
write(framing::ProtocolInitiation(framing::highestProtocolVersion));
readError = true;
- aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1));
+ close();
}
}
}
@@ -296,9 +324,8 @@ void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) {
// If we've got a connection already tear it down, otherwise ignore
RdmaIOHandler* async = ci->getContext<RdmaIOHandler>();
if (async) {
- async->close();
+ async->disconnected();
}
- delete async;
}
uint16_t RdmaIOProtocolFactory::getPort() const {