diff options
| author | Ken Giusti <kgiusti@apache.org> | 2015-02-02 16:05:43 +0000 |
|---|---|---|
| committer | Ken Giusti <kgiusti@apache.org> | 2015-02-02 16:05:43 +0000 |
| commit | d567756feadab00dc6c9ef96a9f93aa210005632 (patch) | |
| tree | c8ad731e6fd5dead8d31a5dbfe83eaec6818008d /qpid/cpp/src | |
| parent | 61753c78d5b7551a53b614a093e2e5483af4289b (diff) | |
| download | qpid-python-d567756feadab00dc6c9ef96a9f93aa210005632.tar.gz | |
QPID-5538: Implement AMQP 1.0 connection idle-timeout.
Original patch by Gordon Sim.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1656505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 70 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 137 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/Transport.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 1 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/idle_timeout_tests.py | 95 |
9 files changed, 290 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 3a93e2aac5..076af79411 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -121,7 +121,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker connection(pn_connection()), transport(pn_transport()), collector(0), - out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) + out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false), ioRequested(false) { #ifdef HAVE_PROTON_EVENTS collector = pn_collector(); @@ -157,6 +157,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker void Connection::requestIO() { + ioRequested = true; out.activateOutput(); } @@ -179,13 +180,24 @@ size_t Connection::decode(const char* buffer, size_t size) { QPID_LOG(trace, id << " decode(" << size << ")"); if (size == 0) return 0; - //TODO: Fix pn_engine_input() to take const buffer + ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size); if (n > 0 || n == PN_EOS) { - //If engine returns EOS, have no way of knowing how many bytes - //it processed, but can assume none need to be reprocessed so - //consider them all read: - if (n == PN_EOS) n = size; + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + out.abort(); + return 0; + } else { + n = size; // assume all consumed + } + } QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size); try { process(); @@ -224,6 +236,15 @@ size_t Connection::encode(char* buffer, size_t size) QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size) haveOutput = true; return n; + } else if (n == PN_EOS) { + haveOutput = false; + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + out.abort(); + } + return 0; } else if (n == PN_ERR) { throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError())); } else { @@ -291,6 +312,7 @@ bool Connection::canEncode() } else { QPID_LOG(info, "Connection " << id << " has been closed locally"); } + if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true; pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) return haveOutput; @@ -303,8 +325,22 @@ void Connection::open() pn_connection_set_container(connection, getBroker().getFederationTag().c_str()); uint32_t timeout = pn_transport_get_remote_idle_timeout(transport); if (timeout) { - ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this)); - pn_transport_set_idle_timeout(transport, timeout); + // if idle generate empty frames at 1/2 the timeout interval as keepalives: + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask((timeout+1)/2, + getBroker().getTimer(), + *this)); + getBroker().getTimer().add(ticker); + + // Note: in version 0-10 of the protocol, idle timeout applies to both + // ends. AMQP 1.0 changes that - it's now asymmetric: each end can + // configure/disable it independently. For backward compatibility, by + // default mimic the old behavior and set our local timeout. + // Use 2x the remote's timeout, as per the spec the remote should + // advertise 1/2 its actual timeout threshold + pn_transport_set_idle_timeout(transport, timeout * 2); + QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(transport) + << " remote=" << pn_transport_get_remote_idle_timeout(transport)); } pn_connection_open(connection); @@ -585,4 +621,22 @@ void Connection::doDeliveryUpdated(pn_delivery_t *delivery) } } +// check for failures of the transport: +bool Connection::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(transport); + if (pn_condition_is_set(tcondition)) + info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition); +#else + pn_error_t* terror = pn_transport_error(transport); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index ea4ce06163..e97d041c03 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -24,6 +24,7 @@ #include "qpid/sys/ConnectionCodec.h" #include "qpid/broker/amqp/BrokerContext.h" #include "qpid/broker/amqp/ManagedConnection.h" +#include "qpid/sys/AtomicValue.h" #include <map> #include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> @@ -80,6 +81,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man bool closeInitiated; bool closeRequested; boost::intrusive_ptr<sys::TimerTask> ticker; + qpid::sys::AtomicValue<bool> ioRequested; virtual void process(); void doOutput(size_t); @@ -92,6 +94,8 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void closedByManagement(); private: + bool checkTransportError(std::string&); + // handle Proton engine events void doConnectionRemoteOpen(); void doConnectionRemoteClose(); @@ -100,7 +104,6 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void doLinkRemoteOpen(pn_link_t *link); void doLinkRemoteClose(pn_link_t *link); void doDeliveryUpdated(pn_delivery_t *delivery); - }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index fedab4286f..0969f76ae4 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -39,6 +39,7 @@ #include "qpid/sys/SecurityLayer.h" #include "qpid/sys/SystemInfo.h" #include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/urlAdd.h" #include "config.h" #include <boost/lexical_cast.hpp> @@ -95,6 +96,27 @@ std::string get_error(pn_connection_t* connection, pn_transport_t* transport) } #endif +class ConnectionTickerTask : public qpid::sys::TimerTask +{ + qpid::sys::Timer& timer; + ConnectionContext& connection; + public: + ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) : + TimerTask(interval, "ConnectionTicker"), + timer(t), + connection(c) + {} + + void fire() { + QPID_LOG(debug, "ConnectionTickerTask fired"); + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Ticker + connection.activateOutput(); + } +}; } void ConnectionContext::trace(const char* message) const @@ -118,23 +140,15 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types:: // Concatenate all known URLs into a single URL, get rid of duplicate addresses. sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ? qpid::Address::TCP : protocol); - if (pn_transport_bind(engine, connection)) { - //error - } if (identifier.empty()) { identifier = qpid::types::Uuid(true).str(); } - pn_connection_set_container(connection, identifier.c_str()); - bool enableTrace(false); - QPID_LOG_TEST_CAT(trace, protocol, enableTrace); - if (enableTrace) { - pn_transport_trace(engine, PN_TRACE_FRM); - set_tracer(engine, this); - } + configureConnection(); } ConnectionContext::~ConnectionContext() { + if (ticker) ticker->cancel(); close(); sessions.clear(); pn_transport_free(engine); @@ -218,6 +232,10 @@ void ConnectionContext::close() lock.wait(); } } + if (ticker) { + ticker->cancel(); + ticker.reset(); + } } bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout) @@ -498,7 +516,7 @@ uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> rece void ConnectionContext::activateOutput() { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - wakeupDriver(); + if (state == CONNECTED) wakeupDriver(); } /** * Expects lock to be held by caller @@ -530,14 +548,11 @@ void ConnectionContext::reset() engine = pn_transport(); connection = pn_connection(); - pn_connection_set_container(connection, identifier.c_str()); - bool enableTrace(false); - QPID_LOG_TEST_CAT(trace, protocol, enableTrace); - if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); + configureConnection(); + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { i->second->reset(connection); } - pn_transport_bind(engine, connection); } void ConnectionContext::check() { @@ -758,12 +773,23 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) //TODO: Fix pn_engine_input() to take const buffer ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size); if (n > 0 || n == PN_EOS) { - //If engine returns EOS, have no way of knowing how many bytes - //it processed, but can assume none need to be reprocessed so - //consider them all read: - if (n == PN_EOS) n = size; + // PN_EOS either means we received a Close (which also means we've + // consumed all the input), OR some Very Bad Thing happened and this + // connection is toast. + if (n == PN_EOS) + { + std::string error; + if (checkTransportError(error)) { + // "He's dead, Jim." + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + return 0; + } else { + n = size; // assume all consumed + } + } QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size) - pn_transport_tick(engine, 0); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); lock.notifyAll(); return n; } else if (n == PN_ERR) { @@ -795,7 +821,13 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) throw MessagingException(QPID_MSG("Error on output: " << getError())); } else if (n == PN_EOS) { haveOutput = false; - return 0;//Is this right? + // Normal close, or error? + std::string error; + if (checkTransportError(error)) { + QPID_LOG_CAT(error, network, id << " connection failed: " << error); + transport->abort(); + } + return 0; } else { haveOutput = false; return 0; @@ -804,6 +836,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) bool ConnectionContext::canEncodePlain() { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); return haveOutput && state == CONNECTED; } void ConnectionContext::closed() @@ -1061,7 +1094,6 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { } QPID_LOG(debug, id << " Opening..."); - setProperties(); pn_connection_open(connection); wakeupDriver(); //want to write while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) && @@ -1071,6 +1103,25 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) { if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { throw qpid::messaging::ConnectionError("Failed to open connection"); } + + // Connection open - check for idle timeout from the remote and start a + // periodic tick to monitor for idle connections + pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine); + pn_timestamp_t local = pn_transport_get_idle_timeout(engine); + uint64_t shortest = ((remote && local) + ? std::min(remote, local) + : (remote) ? remote : local); + if (shortest) { + // send an idle frame at least twice before timeout + shortest = (shortest + 1)/2; + qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC); + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this)); + driver->getTimer().add(ticker); + QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:" + << " local=" << pn_transport_get_idle_timeout(engine) + << " remote=" << pn_transport_get_remote_idle_timeout(engine)); + } + QPID_LOG(debug, id << " Opened"); return restartSessions(); @@ -1151,4 +1202,44 @@ bool ConnectionContext::CodecAdapter::canEncode() } +// setup the transport and connection objects: +void ConnectionContext::configureConnection() +{ + pn_connection_set_container(connection, identifier.c_str()); + setProperties(); + if (heartbeat) { + // fail an idle connection at 2 x heartbeat (in msecs) + pn_transport_set_idle_timeout(engine, heartbeat*2*1000); + } + + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) { + pn_transport_trace(engine, PN_TRACE_FRM); + set_tracer(engine, this); + } + + int err = pn_transport_bind(engine, connection); + if (err) + QPID_LOG(error, id << " Error binding connection and transport: " << err); +} + + +// check for failures of the transport: +bool ConnectionContext::checkTransportError(std::string& text) +{ + std::stringstream info; + +#ifdef USE_PROTON_TRANSPORT_CONDITION + pn_condition_t* tcondition = pn_transport_condition(engine); + if (pn_condition_is_set(tcondition)) + info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition); +#else + pn_error_t* terror = pn_transport_error(engine); + if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]"; +#endif + + text = info.str(); + return !text.empty(); +} }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 12f3c21e0a..80da9dff10 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -25,6 +25,7 @@ #include <map> #include <memory> #include <string> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> #include "qpid/Url.h" #include "qpid/messaging/ConnectionOptions.h" @@ -47,6 +48,7 @@ class ProtocolVersion; namespace sys { class SecurityLayer; struct SecuritySettings; +class TimerTask; } namespace messaging { class Duration; @@ -120,7 +122,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void initSecurityLayer(qpid::sys::SecurityLayer&); void trace(const char*) const; - private: + private: typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; class CodecAdapter : public qpid::sys::Codec { @@ -155,6 +157,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::auto_ptr<Sasl> sasl; CodecAdapter codecAdapter; bool notifyOnWrite; + boost::intrusive_ptr<qpid::sys::TimerTask> ticker; void check(); bool checkDisconnected(); @@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::string getError(); bool useSasl(); void setProperties(); + void configureConnection(); + bool checkTransportError(std::string&); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp index 16307b3c22..ebe3fff1cb 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp @@ -22,13 +22,14 @@ #include "Transport.h" #include "qpid/messaging/exceptions.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/Timer.h" #include "qpid/log/Statement.h" namespace qpid { namespace messaging { namespace amqp { -DriverImpl::DriverImpl() : poller(new qpid::sys::Poller) +DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new qpid::sys::Timer) { start(); } @@ -48,6 +49,7 @@ void DriverImpl::stop() QPID_LOG(debug, "Driver stopped"); poller->shutdown(); thread.join(); + timer->stop(); } boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection) diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h index 354fa1ae35..36cb196343 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h +++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h @@ -29,6 +29,7 @@ namespace qpid { namespace sys { class Poller; +class Timer; } namespace messaging { namespace amqp { @@ -47,11 +48,14 @@ class DriverImpl void stop(); boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection); + sys::Timer& getTimer() { return *timer; } static boost::shared_ptr<DriverImpl> getDefault(); private: boost::shared_ptr<qpid::sys::Poller> poller; qpid::sys::Thread thread; + std::auto_ptr<sys::Timer> timer; + static qpid::sys::Mutex defaultLock; static boost::weak_ptr<DriverImpl> theDefault; }; diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.h b/qpid/cpp/src/qpid/messaging/amqp/Transport.h index 159916f9ae..6ec99ab58f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/Transport.h +++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.h @@ -40,6 +40,7 @@ class Transport : public qpid::sys::OutputControl virtual ~Transport() {} virtual void connect(const std::string& host, const std::string& port) = 0; virtual void close() = 0; + virtual void abort() = 0; virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0; typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>); diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 08a8c69d69..c914c50e33 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -364,6 +364,7 @@ add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py) add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py) if (BUILD_AMQP) add_test (interlink_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py) + add_test (idle_timeout_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/idle_timeout_tests.py) endif (BUILD_AMQP) add_test (swig_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix}) add_test (ipv6_test ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix}) diff --git a/qpid/cpp/src/tests/idle_timeout_tests.py b/qpid/cpp/src/tests/idle_timeout_tests.py new file mode 100755 index 0000000000..c3cc00746b --- /dev/null +++ b/qpid/cpp/src/tests/idle_timeout_tests.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +import shutil +import signal +import sys + +from brokertest import * + +class AmqpIdleTimeoutTest(BrokerTest): + """ + Test AMQP 1.0 idle-timeout support + """ + def setUp(self): + BrokerTest.setUp(self) + if not BrokerTest.amqp_lib: + raise Skipped("AMQP 1.0 library not found") + if qm != qpid_messaging: + raise Skipped("AMQP 1.0 client not found") + self._broker = self.broker() + + def test_client_timeout(self): + """Ensure that the client disconnects should the broker stop + responding. + """ + conn = self._broker.connect(native=False, timeout=None, + protocol="amqp1.0", heartbeat=1) + self.assertTrue(conn.isOpen()) + # should disconnect within 2 seconds of broker stop + deadline = time.time() + 8 + os.kill(self._broker.pid, signal.SIGSTOP) + while time.time() < deadline: + if not conn.isOpen(): + break; + self.assertTrue(not conn.isOpen()) + os.kill(self._broker.pid, signal.SIGCONT) + + + def test_broker_timeout(self): + """By default, the broker will adopt the same timeout as the client + (mimics the 0-10 timeout behavior). Verify the broker disconnects + unresponsive clients. + """ + + count = len(self._broker.agent.getAllConnections()) + + # Create a new connection to the broker: + receiver_cmd = ["qpid-receive", + "--broker", self._broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0, heartbeat:1}", + "--forever"] + receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE, + expect=EXPECT_UNKNOWN) + start = time.time() + deadline = time.time() + 10 + while time.time() < deadline: + if count < len(self._broker.agent.getAllConnections()): + break; + self.assertTrue(count < len(self._broker.agent.getAllConnections())) + + # now 'hang' the client, the broker should disconnect + start = time.time() + receiver.send_signal(signal.SIGSTOP) + deadline = time.time() + 10 + while time.time() < deadline: + if count == len(self._broker.agent.getAllConnections()): + break; + self.assertEqual(count, len(self._broker.agent.getAllConnections())) + receiver.send_signal(signal.SIGCONT) + receiver.teardown() + + +if __name__ == "__main__": + shutil.rmtree("brokertest.tmp", True) + os.execvp("qpid-python-test", + ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:]) |
