summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/client/RdmaConnector.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/client/RdmaConnector.cpp')
-rw-r--r--qpid/cpp/src/qpid/client/RdmaConnector.cpp124
1 files changed, 55 insertions, 69 deletions
diff --git a/qpid/cpp/src/qpid/client/RdmaConnector.cpp b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
index 5558e27752..79f86d09c2 100644
--- a/qpid/cpp/src/qpid/client/RdmaConnector.cpp
+++ b/qpid/cpp/src/qpid/client/RdmaConnector.cpp
@@ -64,8 +64,8 @@ class RdmaConnector : public Connector, public sys::Codec
framing::ProtocolVersion version;
bool initiated;
- sys::Mutex pollingLock;
- bool polling;
+ sys::Mutex dataConnectedLock;
+ bool dataConnected;
sys::ShutdownHandler* shutdownHandler;
framing::InputHandler* input;
@@ -90,7 +90,7 @@ class RdmaConnector : public Connector, public sys::Codec
void writeDataBlock(const framing::AMQDataBlock& data);
void dataError(Rdma::AsynchIO&);
void drained();
- void connectionStopped(Rdma::Connector* acon);
+ void connectionStopped(Rdma::Connector* acon, Rdma::AsynchIO* aio);
void dataStopped(Rdma::AsynchIO* aio);
std::string identifier;
@@ -144,7 +144,7 @@ RdmaConnector::RdmaConnector(Poller::shared_ptr p,
bounds(cimpl),
version(ver),
initiated(false),
- polling(false),
+ dataConnected(false),
shutdownHandler(0),
aio(0),
acon(0),
@@ -171,14 +171,11 @@ RdmaConnector::~RdmaConnector() {
if (acon) {
acon->stop(deleteConnector);
}
- if (shutdownHandler) {
- shutdownHandler->shutdown();
- }
}
void RdmaConnector::connect(const std::string& host, int port){
- Mutex::ScopedLock l(pollingLock);
- assert(!polling);
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
acon = new Rdma::Connector(
Rdma::ConnectionParams(maxFrameSize, Rdma::DEFAULT_WR_ENTRIES),
@@ -187,8 +184,6 @@ void RdmaConnector::connect(const std::string& host, int port){
boost::bind(&RdmaConnector::disconnected, this),
boost::bind(&RdmaConnector::rejected, this, poller, _1, _2));
- polling = true;
-
SocketAddress sa(host, boost::lexical_cast<std::string>(port));
acon->start(poller, sa);
}
@@ -196,6 +191,8 @@ void RdmaConnector::connect(const std::string& host, int port){
// The following only gets run when connected
void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr ci, const Rdma::ConnectionParams& cp) {
try {
+ Mutex::ScopedLock l(dataConnectedLock);
+ assert(!dataConnected);
Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
aio = new Rdma::AsynchIO(ci->getQueuePair(),
@@ -210,67 +207,54 @@ void RdmaConnector::connected(Poller::shared_ptr poller, Rdma::Connection::intru
writeDataBlock(init);
aio->start(poller);
+
+ dataConnected = true;
+
return;
} catch (const Rdma::Exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (Rdma exception): " << e.what());
} catch (const std::exception& e) {
QPID_LOG(error, "Rdma: Cannot create new connection (unknown exception): " << e.what());
}
- {
- Mutex::ScopedLock l(pollingLock);
- assert(polling);
- polling = false;
- }
- connectionStopped(acon);
+ dataConnected = false;
+ connectionStopped(acon, aio);
}
void RdmaConnector::connectionError(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, Rdma::ErrorType) {
QPID_LOG(debug, "Connection Error " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
- connectionStopped(acon);
-}
-
-void RdmaConnector::disconnected() {
- QPID_LOG(debug, "Connection disconnected " << identifier);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
- }
- // Make sure that all the disconnected actions take place on the data "thread"
- aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+ connectionStopped(acon, aio);
}
// Bizarrely we seem to get rejected events *after* we've already got a connected event for some peer disconnects
// so we need to check whether the data connection is started or not in here
void RdmaConnector::rejected(sys::Poller::shared_ptr, Rdma::Connection::intrusive_ptr, const Rdma::ConnectionParams& cp) {
QPID_LOG(debug, "Connection Rejected " << identifier << ": " << cp.maxRecvBufferSize);
- {
- Mutex::ScopedLock l(pollingLock);
- // If we're closed already then we'll get to stopped() anyway
- if (!polling) return;
- polling = false;
- }
if (dataConnected) {
disconnected();
} else {
- connectionStopped(acon);
+ connectionStopped(acon, aio);
}
}
+void RdmaConnector::disconnected() {
+ QPID_LOG(debug, "Connection disconnected " << identifier);
+ {
+ Mutex::ScopedLock l(dataConnectedLock);
+ // If we're closed already then we'll get to drained() anyway
+ if (!dataConnected) return;
+ dataConnected = false;
+ }
+ // Make sure that all the disconnected actions take place on the data "thread"
+ aio->requestCallback(boost::bind(&RdmaConnector::drained, this));
+}
+
void RdmaConnector::dataError(Rdma::AsynchIO&) {
QPID_LOG(debug, "Data Error " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
+ Mutex::ScopedLock l(dataConnectedLock);
// If we're closed already then we'll get to drained() anyway
- if (!polling) return;
- polling = false;
+ if (!dataConnected) return;
+ dataConnected = false;
}
drained();
}
@@ -278,39 +262,37 @@ void RdmaConnector::dataError(Rdma::AsynchIO&) {
void RdmaConnector::close() {
QPID_LOG(debug, "RdmaConnector::close " << identifier);
{
- Mutex::ScopedLock l(pollingLock);
- if (!polling) return;
- polling = false;
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+ dataConnected = false;
}
- if (aio) aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
+ aio->drainWriteQueue(boost::bind(&RdmaConnector::drained, this));
}
void RdmaConnector::drained() {
QPID_LOG(debug, "RdmaConnector::drained " << identifier);
- assert(!polling);
- if (aio) {
- Rdma::AsynchIO* a = aio;
- aio = 0;
- a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
- }
+ assert(!dataConnected);
+ assert(aio);
+ Rdma::AsynchIO* a = aio;
+ aio = 0;
+ a->stop(boost::bind(&RdmaConnector::dataStopped, this, a));
}
void RdmaConnector::dataStopped(Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::dataStopped " << identifier);
- assert(!polling);
- aio = 0;
- delete a;
- if (acon) {
- Rdma::Connector* c = acon;
- acon = 0;
- c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c));
- }
+ assert(!dataConnected);
+ assert(acon);
+ Rdma::Connector* c = acon;
+ acon = 0;
+ c->stop(boost::bind(&RdmaConnector::connectionStopped, this, c, a));
}
-void RdmaConnector::connectionStopped(Rdma::Connector* c) {
+void RdmaConnector::connectionStopped(Rdma::Connector* c, Rdma::AsynchIO* a) {
QPID_LOG(debug, "RdmaConnector::connectionStopped " << identifier);
- assert(!polling);
+ assert(!dataConnected);
+ aio = 0;
acon = 0;
+ delete a;
delete c;
if (shutdownHandler) {
ShutdownHandler* s = shutdownHandler;
@@ -340,6 +322,10 @@ const std::string& RdmaConnector::getIdentifier() const {
}
void RdmaConnector::send(AMQFrame& frame) {
+ // It is possible that we are called to write after we are already shutting down
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected) return;
+
bool notifyWrite = false;
{
Mutex::ScopedLock l(lock);
@@ -354,15 +340,15 @@ void RdmaConnector::send(AMQFrame& frame) {
notifyWrite = (currentSize >= maxFrameSize);
}
}
- if (notifyWrite && polling) aio->notifyPendingWrite();
+ if (notifyWrite) aio->notifyPendingWrite();
}
// Called in IO thread. (write idle routine)
// This is NOT only called in response to previously calling notifyPendingWrite
void RdmaConnector::writebuff(Rdma::AsynchIO&) {
// It's possible to be disconnected and be writable
- Mutex::ScopedLock l(pollingLock);
- if (!polling)
+ Mutex::ScopedLock l(dataConnectedLock);
+ if (!dataConnected)
return;
Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this;