summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:05 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:05 +0000
commitfe9fca8fadffbb8658a001884b33c39fcd29f2c4 (patch)
tree893f0a17b612c43a8a3304483fdf95156963052a
parent18fe1fa364b7c0fa84d8e5eebdbe246e4c3fadee (diff)
downloadqpid-python-fe9fca8fadffbb8658a001884b33c39fcd29f2c4.tar.gz
Delay deleting the Rdma::AsynchIO associated with a Connection to just before
the callback to the ConnectionImpl shutdown function so that we make the possibility of race between a write coming down and deleting it as small as possible. Rearranged scope of polling boolean to indicate that the data channel is connected (or not) and changed name to better describe its function git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1021821 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp124
1 files changed, 55 insertions, 69 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 5558e27752..79f86d09c2 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -64,8 +64,8 @@ class RdmaConnector : public Connector, public sys::Codec
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex pollingLock;
- bool polling;
+ sys::Mutex dataConnectedLock;
+ bool dataConnected;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
@@ -90,7 +90,7 @@ class RdmaConnector : public Connector, public sys::Codec
void writeDataBlock(const framing::AMQDataBlock& data);
void dataError(Rdma::AsynchIO&);
void drained();
- void connectionStopped(Rdma::Connector* acon);
+ void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio);
void dataStopped(Rdma::AsynchIO* aio);
std::string identifier;
@@ -144,7 +144,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
bounds(cimpl),
version(ver),
initiated(false),
- polling(false),
+ dataConnected(false),
shutdownHandler(0),
aio(0),
acon(0),
@@ -171,14 +171,11 @@ RdmaConnector::~RdmaConnector() {
if (acon) {
acon->stop(deleteConnector);
}
- if (shutdownHandler) {
- shutdownHandler->shutdown();
- }
}
void RdmaConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(pollingLock);
- assert(!polling);
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
acon = new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
@@ -187,8 +184,6 @@ void RdmaConnector::connect(const std::string& host, int port){
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- polling = true;
-
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
acon->start(poller, sa);
}
@@ -196,6 +191,8 @@ void RdmaConnector::connect(const std::string& host, int port){
// The following only gets run when connected
void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) {
try {
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
aio = new Rdma::AsynchIO(ci->getQueuePair(),
@@ -210,67 +207,54 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
writeDataBlock(init);
aio->start(poller);
+
+ dataConnected = true;
+
return;
} catch (const Rdma::Exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what());
}
- {
- Mutex::ScopedLock l(pollingLock);
- assert(polling);
- polling = false;
- }
- connectionStopped(acon);
+ dataConnected = false;
+ connectionStopped(acon, aio);
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
QPID_LOG(debug, "Connection Error " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
- connectionStopped(acon);
-}
-
-void RdmaConnector::disconnected() {
- QPID_LOG(debug, "Connection disconnected " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
- }
- // Make sure that all the disconnected actions take place on the data "thread"
- aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+ connectionStopped(acon, aio);
}
// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects
// so we need to check whether the data connection is started or not in here
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
if (dataConnected) {
disconnected();
} else {
- connectionStopped(acon);
+ connectionStopped(acon, aio);
}
}
+void RdmaConnector::disconnected() {
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+}
+
void RdmaConnector::dataError(Rdma::AsynchIO&) {
QPID_LOG(debug, "Data Error " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
+ Mutex::ScopedLock l(dataConnectedLock);
// If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
+ if (!dataConnected) return;
+ dataConnected = false;
}
drained();
}
@@ -278,39 +262,37 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) {
void RdmaConnector::close() {
QPID_LOG(debug, "RdmaConnector::close " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
- if (!polling) return;
- polling = false;
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+ dataConnected = false;
}
- if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+ aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::drained() {
QPID_LOG(debug, "RdmaConnector::drained " << identifier);
- assert(!polling);
- if (aio) {
- Rdma::AsynchIO* a = aio;
- aio = 0;
- a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
- }
+ assert(!dataConnected);
+ assert(aio);
+ Rdma::AsynchIO* a = aio;
+ aio = 0;
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
}
void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
- assert(!polling);
- aio = 0;
- delete a;
- if (acon) {
- Rdma::Connector* c = acon;
- acon = 0;
- c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c));
- }
+ assert(!dataConnected);
+ assert(acon);
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a));
}
-void RdmaConnector::connectionStopped(Rdma::Connector* c) {
+void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
- assert(!polling);
+ assert(!dataConnected);
+ aio = 0;
acon = 0;
+ delete a;
delete c;
if (shutdownHandler) {
ShutdownHandler* s = shutdownHandler;
@@ -340,6 +322,10 @@ const std::string& RdmaConnector::getIdentifier() const {
}
void RdmaConnector::send(AMQFrame& frame) {
+ // It is possible that we are called to write after we are already shutting down
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+
bool notifyWrite = false;
{
Mutex::ScopedLock l(lock);
@@ -354,15 +340,15 @@ void RdmaConnector::send(AMQFrame& frame) {
notifyWrite = (currentSize >= maxFrameSize);
}
}
- if (notifyWrite && polling) aio->notifyPendingWrite();
+ if (notifyWrite) aio->notifyPendingWrite();
}
// Called in IO thread. (write idle routine)
// This is NOT only called in response to previously calling notifyPendingWrite
void RdmaConnector::writebuff(Rdma::AsynchIO&) {
// It's possible to be disconnected and be writable
- Mutex::ScopedLock l(pollingLock);
- if (!polling)
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected)
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;