diff options
author | Andrew Stitcher <astitcher@apache.org> | 2013-06-04 14:27:55 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2013-06-04 14:27:55 +0000 |
commit | 5566d2a9185ade49b5a9e12fc222b8abd73b5fb9 (patch) | |
tree | 3c95ed435fca3589df4297f9495a3ba3a1b98bbd | |
parent | d21252142514d7de4d1279ae43e0e4b67e0321ea (diff) | |
download | qpid-python-5566d2a9185ade49b5a9e12fc222b8abd73b5fb9.tar.gz |
QPID-4854: Make the protocol negotiation timeout actually relate to
the protocol negotiation!
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1489458 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/amqp_0_10/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/amqp/Connection.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/SslTransport.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/messaging/amqp/TcpTransport.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.cpp | 18 | ||||
-rw-r--r-- | cpp/src/qpid/sys/AsynchIOHandler.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/OutputControl.h | 1 | ||||
-rw-r--r-- | cpp/src/qpid/sys/RdmaIOPlugin.cpp | 5 |
12 files changed, 24 insertions, 11 deletions
diff --git a/cpp/src/qpid/amqp_0_10/Connection.cpp b/cpp/src/qpid/amqp_0_10/Connection.cpp index bb30ece285..c95d1a2d1e 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -118,6 +118,7 @@ size_t Connection::encode(char* buffer, size_t size) { } void Connection::abort() { output.abort(); } +void Connection::connectionEstablished() { output.connectionEstablished(); } void Connection::activateOutput() { output.activateOutput(); } void Connection::close() { diff --git a/cpp/src/qpid/amqp_0_10/Connection.h b/cpp/src/qpid/amqp_0_10/Connection.h index 44c4c87ebd..02137fc0f3 100644 --- a/cpp/src/qpid/amqp_0_10/Connection.h +++ b/cpp/src/qpid/amqp_0_10/Connection.h @@ -65,6 +65,7 @@ class Connection : public sys::ConnectionCodec, bool isClosed() const; bool canEncode(); void abort(); + void connectionEstablished(); void activateOutput(); void closed(); // connection closed by peer. void close(); // closing from this end. diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index dc55dce6bb..03b9f1015d 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -456,6 +456,7 @@ void Connection::setHeartbeatInterval(uint16_t heartbeat) timer.add(timeoutTimer); } } + out.connectionEstablished(); } void Connection::startLinkHeartbeatTimeoutTask() { @@ -463,6 +464,7 @@ void Connection::startLinkHeartbeatTimeoutTask() { linkHeartbeatTimer = new LinkHeartbeatTask(timer, 2 * heartbeat * TIME_SEC, *this); timer.add(linkHeartbeatTimer); } + out.connectionEstablished(); } void Connection::restartTimeout() @@ -480,6 +482,7 @@ bool Connection::isOpen() { return adapter.isOpen(); } Connection::OutboundFrameTracker::OutboundFrameTracker(Connection& _con) : con(_con), next(0) {} void Connection::OutboundFrameTracker::close() { next->close(); } void Connection::OutboundFrameTracker::abort() { next->abort(); } +void Connection::OutboundFrameTracker::connectionEstablished() { next->connectionEstablished(); } void Connection::OutboundFrameTracker::activateOutput() { next->activateOutput(); } void Connection::OutboundFrameTracker::send(framing::AMQFrame& f) { diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index ccf10176c1..b1793e0cf4 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -189,6 +189,7 @@ class Connection : public sys::ConnectionInputHandler, OutboundFrameTracker(Connection&); void close(); void abort(); + void connectionEstablished(); void activateOutput(); void send(framing::AMQFrame&); void wrap(sys::ConnectionOutputHandlerPtr&); diff --git a/cpp/src/qpid/broker/amqp/Connection.cpp b/cpp/src/qpid/broker/amqp/Connection.cpp index d706e6e49e..51576e9577 100644 --- a/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/cpp/src/qpid/broker/amqp/Connection.cpp @@ -181,6 +181,7 @@ void Connection::process() pn_connection_set_container(connection, broker.getFederationTag().c_str()); setContainerId(pn_connection_remote_container(connection)); pn_connection_open(connection); + out.connectionEstablished(); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { diff --git a/cpp/src/qpid/messaging/amqp/SslTransport.h b/cpp/src/qpid/messaging/amqp/SslTransport.h index f67ab95673..aad82c2c2a 100644 --- a/cpp/src/qpid/messaging/amqp/SslTransport.h +++ b/cpp/src/qpid/messaging/amqp/SslTransport.h @@ -48,6 +48,7 @@ class SslTransport : public Transport void activateOutput(); void abort(); + void connectionEstablished() {}; void close(); private: diff --git a/cpp/src/qpid/messaging/amqp/TcpTransport.h b/cpp/src/qpid/messaging/amqp/TcpTransport.h index 8c1087abb3..d7adf64f3e 100644 --- a/cpp/src/qpid/messaging/amqp/TcpTransport.h +++ b/cpp/src/qpid/messaging/amqp/TcpTransport.h @@ -48,6 +48,7 @@ class TcpTransport : public Transport void activateOutput(); void abort(); + void connectionEstablished() {}; void close(); private: diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp index cf08b482e6..f102807b5b 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -100,6 +100,13 @@ void AsynchIOHandler::abort() { aio->queueWriteClose(); } +void AsynchIOHandler::connectionEstablished() { + if (timeoutTimerTask) { + timeoutTimerTask->cancel(); + timeoutTimerTask.reset(); + } +} + void AsynchIOHandler::activateOutput() { aio->notifyPendingWrite(); } @@ -123,13 +130,6 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (codec) { // Already initiated try { decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount); - // When we've decoded 3 reads (probably frames) we will have authenticated and - // started heartbeats, if specified, in many (but not all) cases so now we will cancel - // the idle connection timeout - this is really hacky, and would be better implemented - // in the connection, but that isn't actually created until the first decode. - if (reads == 3) { - timeoutTimerTask->cancel(); - } }catch(const std::exception& e){ QPID_LOG(error, e.what()); readError = true; @@ -203,10 +203,6 @@ void AsynchIOHandler::idle(AsynchIO&){ if (isClient && codec == 0) { codec = factory->create(*this, identifier, getSecuritySettings(aio, nodict)); write(framing::ProtocolInitiation(codec->getVersion())); - // We've just sent the protocol negotiation so we can cancel the timeout for that - // This is not ideal, because we've not received anything yet, but heartbeats will - // be active soon - timeoutTimerTask->cancel(); return; } if (codec == 0) return; diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h index d93e24fd4c..c99c5897c6 100644 --- a/cpp/src/qpid/sys/AsynchIOHandler.h +++ b/cpp/src/qpid/sys/AsynchIOHandler.h @@ -63,6 +63,7 @@ class AsynchIOHandler : public OutputControl { // Output side QPID_COMMON_EXTERN void abort(); + QPID_COMMON_EXTERN void connectionEstablished(); QPID_COMMON_EXTERN void activateOutput(); // Input side diff --git a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h index 4a891ec62f..396c340c11 100644 --- a/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h +++ b/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h @@ -43,6 +43,7 @@ class ConnectionOutputHandlerPtr : public ConnectionOutputHandler void close() { next->close(); } void abort() { next->abort(); } + void connectionEstablished() { next->connectionEstablished(); } void activateOutput() { next->activateOutput(); } void send(framing::AMQFrame& f) { next->send(f); } diff --git a/cpp/src/qpid/sys/OutputControl.h b/cpp/src/qpid/sys/OutputControl.h index 0d801e9d16..f990594637 100644 --- a/cpp/src/qpid/sys/OutputControl.h +++ b/cpp/src/qpid/sys/OutputControl.h @@ -32,6 +32,7 @@ namespace sys { public: virtual ~OutputControl() {} virtual void abort() = 0; + virtual void connectionEstablished() = 0; virtual void activateOutput() = 0; }; diff --git a/cpp/src/qpid/sys/RdmaIOPlugin.cpp b/cpp/src/qpid/sys/RdmaIOPlugin.cpp index 8655a8baa3..31efd6753f 100644 --- a/cpp/src/qpid/sys/RdmaIOPlugin.cpp +++ b/cpp/src/qpid/sys/RdmaIOPlugin.cpp @@ -67,6 +67,7 @@ class RdmaIOHandler : public OutputControl { // Output side void close(); void abort(); + void connectionEstablished(); void activateOutput(); void initProtocolOut(); @@ -131,6 +132,10 @@ void RdmaIOHandler::close() { void RdmaIOHandler::abort() { } +// TODO: Dummy implementation, need to fill this in for connection establishment timeout to work +void RdmaIOHandler::connectionEstablished() { +} + void RdmaIOHandler::activateOutput() { aio->notifyPendingWrite(); } |