diff options
author | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:49:30 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2010-09-08 16:49:30 +0000 |
commit | 26f29811c8103c1d00b38a9a0d3754165e2770fa (patch) | |
tree | 1af343558aa767d40efdf28e89fd1c5dc21c9916 /cpp/src | |
parent | 518596d72441fb1ed8a04717e378a9296d8bdc76 (diff) | |
download | qpid-python-26f29811c8103c1d00b38a9a0d3754165e2770fa.tar.gz |
Improve daemon handling of unexpected RDMA disconnects from client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@995139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 69 |
1 files changed, 48 insertions, 21 deletions
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 7c2dc77caa..09c9770a5b 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -51,13 +51,16 @@ class RdmaIOHandler : public OutputControl { ConnectionCodec* codec; bool readError; + sys::Mutex pollingLock; + bool polling; + void write(const framing::ProtocolInitiation&); public: RdmaIOHandler(Rdma::Connection::intrusive_ptr c, ConnectionCodec::Factory* f); ~RdmaIOHandler(); void init(Rdma::AsynchIO* a); - void start(Poller::shared_ptr poller) {aio->start(poller);} + void start(Poller::shared_ptr poller); // Output side void close(); @@ -74,7 +77,8 @@ class RdmaIOHandler : public OutputControl { void full(Rdma::AsynchIO& aio); void idle(Rdma::AsynchIO& aio); void error(Rdma::AsynchIO& aio); - void drained(Rdma::AsynchIO& aio); + void disconnected(); + void drained(); }; RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::ConnectionCodec::Factory* f) : @@ -82,26 +86,29 @@ RdmaIOHandler::RdmaIOHandler(Rdma::Connection::intrusive_ptr c, qpid::sys::Conne identifier(c->getPeerName()), factory(f), codec(0), - readError(false) + readError(false), + polling(false) { } +RdmaIOHandler::~RdmaIOHandler() { + if (codec) + codec->closed(); + delete codec; + delete aio; +} + void RdmaIOHandler::init(Rdma::AsynchIO* a) { aio = a; } -namespace { - void deleteAsynchIO(Rdma::AsynchIO& aio) { - delete &aio; - } -} +void RdmaIOHandler::start(Poller::shared_ptr poller) { + Mutex::ScopedLock l(pollingLock); + assert(!polling); -RdmaIOHandler::~RdmaIOHandler() { - if (codec) - codec->closed(); - delete codec; + polling = true; - aio->stop(deleteAsynchIO); + aio->start(poller); } void RdmaIOHandler::write(const framing::ProtocolInitiation& data) @@ -115,7 +122,10 @@ void RdmaIOHandler::write(const framing::ProtocolInitiation& data) } void RdmaIOHandler::close() { - aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); + Mutex::ScopedLock l(pollingLock); + if (!polling) return; + polling = false; + aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this)); } // TODO: Dummy implementation, need to fill this in for heartbeat timeout to work @@ -140,7 +150,7 @@ void RdmaIOHandler::idle(Rdma::AsynchIO&) { aio->queueWrite(buff); } if (codec->isClosed()) - aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); + close(); } void RdmaIOHandler::initProtocolOut() { @@ -153,10 +163,28 @@ void RdmaIOHandler::initProtocolOut() { } void RdmaIOHandler::error(Rdma::AsynchIO&) { - close(); + disconnected(); +} + +void RdmaIOHandler::disconnected() { + { + Mutex::ScopedLock l(pollingLock); + // If we're closed already then we'll get to drained() anyway + if (!polling) return; + polling = false; + } + drained(); +} + +namespace { + void stopped(RdmaIOHandler* async) { + delete async; + } } -void RdmaIOHandler::drained(Rdma::AsynchIO&) { +void RdmaIOHandler::drained() { + assert(!polling); + aio->stop(boost::bind(&stopped, this)); } void RdmaIOHandler::full(Rdma::AsynchIO&) { @@ -186,7 +214,7 @@ void RdmaIOHandler::readbuff(Rdma::AsynchIO&, Rdma::Buffer* buff) { }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; - aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); + close(); } } @@ -205,7 +233,7 @@ void RdmaIOHandler::initProtocolIn(Rdma::Buffer* buff) { // send valid version header & close connection. write(framing::ProtocolInitiation(framing::highestProtocolVersion)); readError = true; - aio->drainWriteQueue(boost::bind(&RdmaIOHandler::drained, this, _1)); + close(); } } } @@ -296,9 +324,8 @@ void RdmaIOProtocolFactory::disconnected(Rdma::Connection::intrusive_ptr ci) { // If we've got a connection already tear it down, otherwise ignore RdmaIOHandler* async = ci->getContext<RdmaIOHandler>(); if (async) { - async->close(); + async->disconnected(); } - delete async; } uint16_t RdmaIOProtocolFactory::getPort() const { |