summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-06-04 14:27:55 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-06-04 14:27:55 +0000
commit5566d2a9185ade49b5a9e12fc222b8abd73b5fb9 (patch)
tree3c95ed435fca3589df4297f9495a3ba3a1b98bbd
parentd21252142514d7de4d1279ae43e0e4b67e0321ea (diff)
downloadqpid-python-5566d2a9185ade49b5a9e12fc222b8abd73b5fb9.tar.gz
QPID-4854: Make the protocol negotiation timeout actually relate to
the protocol negotiation! git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1489458 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.cpp1
-rw-r--r--cpp/src/qpid/amqp_0_10/Connection.h1
-rw-r--r--cpp/src/qpid/broker/Connection.cpp3
-rw-r--r--cpp/src/qpid/broker/Connection.h1
-rw-r--r--cpp/src/qpid/broker/amqp/Connection.cpp1
-rw-r--r--cpp/src/qpid/messaging/amqp/SslTransport.h1
-rw-r--r--cpp/src/qpid/messaging/amqp/TcpTransport.h1
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp18
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h1
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h1
-rw-r--r--cpp/src/qpid/sys/OutputControl.h1
-rw-r--r--cpp/src/qpid/sys/RdmaIOPlugin.cpp5
12 files changed, 24 insertions, 11 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp
index bb30ece285..c95d1a2d1e 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.cpp
+++ b/cpp/src/qpid/amqp_0_10/Connection.cpp
@@ -118,6 +118,7 @@ size_t Connection::encode(char* buffer, size_t size) {
}
void Connection::abort() { output.abort(); }
+void Connection::connectionEstablished() { output.connectionEstablished(); }
void Connection::activateOutput() { output.activateOutput(); }
void Connection::close() {
diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h
index 44c4c87ebd..02137fc0f3 100644
--- a/cpp/src/qpid/amqp_0_10/Connection.h
+++ b/cpp/src/qpid/amqp_0_10/Connection.h
@@ -65,6 +65,7 @@ class Connection : public sys::ConnectionCodec,
bool isClosed() const;
bool canEncode();
void abort();
+ void connectionEstablished();
void activateOutput();
void closed(); // connection closed by peer.
void close(); // closing from this end.
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index dc55dce6bb..03b9f1015d 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -456,6 +456,7 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat)
timer.add(timeoutTimer);
}
}
+ out.connectionEstablished();
}
void Connection::startLinkHeartbeatTimeoutTask() {
@@ -463,6 +464,7 @@ void Connection::startLinkHeartbeatTimeoutTask() {
linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this);
timer.add(linkHeartbeatTimer);
}
+ out.connectionEstablished();
}
void Connection::restartTimeout()
@@ -480,6 +482,7 @@ bool Connection::isOpen() { return adapter.isOpen(); }
Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {}
void Connection::OutboundFrameTracker::close() { next->close(); }
void Connection::OutboundFrameTracker::abort() { next->abort(); }
+void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished(); }
void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); }
void Connection::OutboundFrameTracker::send(framing::AMQFrame& f)
{
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index ccf10176c1..b1793e0cf4 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -189,6 +189,7 @@ class Connection : public sys::ConnectionInputHandler,
OutboundFrameTracker(Connection&);
void close();
void abort();
+ void connectionEstablished();
void activateOutput();
void send(framing::AMQFrame&);
void wrap(sys::ConnectionOutputHandlerPtr&);
diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp
index d706e6e49e..51576e9577 100644
--- a/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -181,6 +181,7 @@ void Connection::process()
pn_connection_set_container(connection, broker.getFederationTag().c_str());
setContainerId(pn_connection_remote_container(connection));
pn_connection_open(connection);
+ out.connectionEstablished();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.h b/cpp/src/qpid/messaging/amqp/SslTransport.h
index f67ab95673..aad82c2c2a 100644
--- a/cpp/src/qpid/messaging/amqp/SslTransport.h
+++ b/cpp/src/qpid/messaging/amqp/SslTransport.h
@@ -48,6 +48,7 @@ class SslTransport : public Transport
void activateOutput();
void abort();
+ void connectionEstablished() {};
void close();
private:
diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.h b/cpp/src/qpid/messaging/amqp/TcpTransport.h
index 8c1087abb3..d7adf64f3e 100644
--- a/cpp/src/qpid/messaging/amqp/TcpTransport.h
+++ b/cpp/src/qpid/messaging/amqp/TcpTransport.h
@@ -48,6 +48,7 @@ class TcpTransport : public Transport
void activateOutput();
void abort();
+ void connectionEstablished() {};
void close();
private:
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index cf08b482e6..f102807b5b 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -100,6 +100,13 @@ void AsynchIOHandler::abort() {
aio->queueWriteClose();
}
+void AsynchIOHandler::connectionEstablished() {
+ if (timeoutTimerTask) {
+ timeoutTimerTask->cancel();
+ timeoutTimerTask.reset();
+ }
+}
+
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
@@ -123,13 +130,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (codec) { // Already initiated
try {
decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
- // When we've decoded 3 reads (probably frames) we will have authenticated and
- // started heartbeats, if specified, in many (but not all) cases so now we will cancel
- // the idle connection timeout - this is really hacky, and would be better implemented
- // in the connection, but that isn't actually created until the first decode.
- if (reads == 3) {
- timeoutTimerTask->cancel();
- }
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
@@ -203,10 +203,6 @@ void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict));
write(framing::ProtocolInitiation(codec->getVersion()));
- // We've just sent the protocol negotiation so we can cancel the timeout for that
- // This is not ideal, because we've not received anything yet, but heartbeats will
- // be active soon
- timeoutTimerTask->cancel();
return;
}
if (codec == 0) return;
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index d93e24fd4c..c99c5897c6 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -63,6 +63,7 @@ class AsynchIOHandler : public OutputControl {
// Output side
QPID_COMMON_EXTERN void abort();
+ QPID_COMMON_EXTERN void connectionEstablished();
QPID_COMMON_EXTERN void activateOutput();
// Input side
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
index 4a891ec62f..396c340c11 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
@@ -43,6 +43,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
void close() { next->close(); }
void abort() { next->abort(); }
+ void connectionEstablished() { next->connectionEstablished(); }
void activateOutput() { next->activateOutput(); }
void send(framing::AMQFrame& f) { next->send(f); }
diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h
index 0d801e9d16..f990594637 100644
--- a/cpp/src/qpid/sys/OutputControl.h
+++ b/cpp/src/qpid/sys/OutputControl.h
@@ -32,6 +32,7 @@ namespace sys {
public:
virtual ~OutputControl() {}
virtual void abort() = 0;
+ virtual void connectionEstablished() = 0;
virtual void activateOutput() = 0;
};
diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
index 8655a8baa3..31efd6753f 100644
--- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp
+++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp
@@ -67,6 +67,7 @@ class RdmaIOHandler : public OutputControl {
// Output side
void close();
void abort();
+ void connectionEstablished();
void activateOutput();
void initProtocolOut();
@@ -131,6 +132,10 @@ void RdmaIOHandler::close() {
void RdmaIOHandler::abort() {
}
+// TODO: Dummy implementation, need to fill this in for connection establishment timeout to work
+void RdmaIOHandler::connectionEstablished() {
+}
+
void RdmaIOHandler::activateOutput() {
aio->notifyPendingWrite();
}