summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:05 +0000
committerAndrew Stitcher <astitcher@apache.org>2010-10-12 16:05:05 +0000
commit70ef9a1f03a085e19f4663a8877eea8dcefb7842 (patch)
treed28654f4e6cdff41ceef0526fc90e805dd2cea01 /cpp/src
parentd862001d16a4ab7ba9490e99d563fb3d86eb2b3c (diff)
downloadqpid-python-70ef9a1f03a085e19f4663a8877eea8dcefb7842.tar.gz
Delay deleting the Rdma::AsynchIO associated with a Connection to just before
the callback to the ConnectionImpl shutdown function so that we make the possibility of race between a write coming down and deleting it as small as possible. Rearranged scope of polling boolean to indicate that the data channel is connected (or not) and changed name to better describe its function git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1021821 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/RdmaConnector.cpp124
1 files changed, 55 insertions, 69 deletions
diff --git a/cpp/src/qpid/client/RdmaConnector.cpp b/cpp/src/qpid/client/RdmaConnector.cpp
index 5558e27752..79f86d09c2 100644
--- a/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/cpp/src/qpid/client/RdmaConnector.cpp
@@ -64,8 +64,8 @@ class RdmaConnector : public Connector, public sys::Codec
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex pollingLock;
- bool polling;
+ sys::Mutex dataConnectedLock;
+ bool dataConnected;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
@@ -90,7 +90,7 @@ class RdmaConnector : public Connector, public sys::Codec
void writeDataBlock(const framing::AMQDataBlock& data);
void dataError(Rdma::AsynchIO&);
void drained();
- void connectionStopped(Rdma::Connector* acon);
+ void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio);
void dataStopped(Rdma::AsynchIO* aio);
std::string identifier;
@@ -144,7 +144,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
bounds(cimpl),
version(ver),
initiated(false),
- polling(false),
+ dataConnected(false),
shutdownHandler(0),
aio(0),
acon(0),
@@ -171,14 +171,11 @@ RdmaConnector::~RdmaConnector() {
if (acon) {
acon->stop(deleteConnector);
}
- if (shutdownHandler) {
- shutdownHandler->shutdown();
- }
}
void RdmaConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(pollingLock);
- assert(!polling);
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
acon = new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
@@ -187,8 +184,6 @@ void RdmaConnector::connect(const std::string& host, int port){
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- polling = true;
-
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
acon->start(poller, sa);
}
@@ -196,6 +191,8 @@ void RdmaConnector::connect(const std::string& host, int port){
// The following only gets run when connected
void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) {
try {
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
aio = new Rdma::AsynchIO(ci->getQueuePair(),
@@ -210,67 +207,54 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
writeDataBlock(init);
aio->start(poller);
+
+ dataConnected = true;
+
return;
} catch (const Rdma::Exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what());
}
- {
- Mutex::ScopedLock l(pollingLock);
- assert(polling);
- polling = false;
- }
- connectionStopped(acon);
+ dataConnected = false;
+ connectionStopped(acon, aio);
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
QPID_LOG(debug, "Connection Error " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
- connectionStopped(acon);
-}
-
-void RdmaConnector::disconnected() {
- QPID_LOG(debug, "Connection disconnected " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
- }
- // Make sure that all the disconnected actions take place on the data "thread"
- aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+ connectionStopped(acon, aio);
}
// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects
// so we need to check whether the data connection is started or not in here
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
if (dataConnected) {
disconnected();
} else {
- connectionStopped(acon);
+ connectionStopped(acon, aio);
}
}
+void RdmaConnector::disconnected() {
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+}
+
void RdmaConnector::dataError(Rdma::AsynchIO&) {
QPID_LOG(debug, "Data Error " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
+ Mutex::ScopedLock l(dataConnectedLock);
// If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
+ if (!dataConnected) return;
+ dataConnected = false;
}
drained();
}
@@ -278,39 +262,37 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) {
void RdmaConnector::close() {
QPID_LOG(debug, "RdmaConnector::close " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
- if (!polling) return;
- polling = false;
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+ dataConnected = false;
}
- if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+ aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::drained() {
QPID_LOG(debug, "RdmaConnector::drained " << identifier);
- assert(!polling);
- if (aio) {
- Rdma::AsynchIO* a = aio;
- aio = 0;
- a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
- }
+ assert(!dataConnected);
+ assert(aio);
+ Rdma::AsynchIO* a = aio;
+ aio = 0;
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
}
void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
- assert(!polling);
- aio = 0;
- delete a;
- if (acon) {
- Rdma::Connector* c = acon;
- acon = 0;
- c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c));
- }
+ assert(!dataConnected);
+ assert(acon);
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a));
}
-void RdmaConnector::connectionStopped(Rdma::Connector* c) {
+void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
- assert(!polling);
+ assert(!dataConnected);
+ aio = 0;
acon = 0;
+ delete a;
delete c;
if (shutdownHandler) {
ShutdownHandler* s = shutdownHandler;
@@ -340,6 +322,10 @@ const std::string& RdmaConnector::getIdentifier() const {
}
void RdmaConnector::send(AMQFrame& frame) {
+ // It is possible that we are called to write after we are already shutting down
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+
bool notifyWrite = false;
{
Mutex::ScopedLock l(lock);
@@ -354,15 +340,15 @@ void RdmaConnector::send(AMQFrame& frame) {
notifyWrite = (currentSize >= maxFrameSize);
}
}
- if (notifyWrite && polling) aio->notifyPendingWrite();
+ if (notifyWrite) aio->notifyPendingWrite();
}
// Called in IO thread. (write idle routine)
// This is NOT only called in response to previously calling notifyPendingWrite
void RdmaConnector::writebuff(Rdma::AsynchIO&) {
// It's possible to be disconnected and be writable
- Mutex::ScopedLock l(pollingLock);
- if (!polling)
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected)
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;