summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorKen Giusti <kgiusti@apache.org>2015-02-02 16:05:43 +0000
committerKen Giusti <kgiusti@apache.org>2015-02-02 16:05:43 +0000
commitd567756feadab00dc6c9ef96a9f93aa210005632 (patch)
treec8ad731e6fd5dead8d31a5dbfe83eaec6818008d /qpid/cpp/src
parent61753c78d5b7551a53b614a093e2e5483af4289b (diff)
downloadqpid-python-d567756feadab00dc6c9ef96a9f93aa210005632.tar.gz
QPID-5538: Implement AMQP 1.0 connection idle-timeout.
Original patch by Gordon Sim. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1656505 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp70
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp137
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/Transport.h1
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt1
-rwxr-xr-xqpid/cpp/src/tests/idle_timeout_tests.py95
9 files changed, 290 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 3a93e2aac5..076af79411 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -121,7 +121,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
connection(pn_connection()),
transport(pn_transport()),
collector(0),
- out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
+ out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false), ioRequested(false)
{
#ifdef HAVE_PROTON_EVENTS
collector = pn_collector();
@@ -157,6 +157,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
void Connection::requestIO()
{
+ ioRequested = true;
out.activateOutput();
}
@@ -179,13 +180,24 @@ size_t Connection::decode(const char* buffer, size_t size)
{
QPID_LOG(trace, id << " decode(" << size << ")");
if (size == 0) return 0;
- //TODO: Fix pn_engine_input() to take const buffer
+
ssize_t n = pn_transport_input(transport, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
- //If engine returns EOS, have no way of knowing how many bytes
- //it processed, but can assume none need to be reprocessed so
- //consider them all read:
- if (n == PN_EOS) n = size;
+ // PN_EOS either means we received a Close (which also means we've
+ // consumed all the input), OR some Very Bad Thing happened and this
+ // connection is toast.
+ if (n == PN_EOS)
+ {
+ std::string error;
+ if (checkTransportError(error)) {
+ // "He's dead, Jim."
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ out.abort();
+ return 0;
+ } else {
+ n = size; // assume all consumed
+ }
+ }
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
try {
process();
@@ -224,6 +236,15 @@ size_t Connection::encode(char* buffer, size_t size)
QPID_LOG_CAT(debug, network, id << " encoded " << n << " bytes from " << size)
haveOutput = true;
return n;
+ } else if (n == PN_EOS) {
+ haveOutput = false;
+ // Normal close, or error?
+ std::string error;
+ if (checkTransportError(error)) {
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ out.abort();
+ }
+ return 0;
} else if (n == PN_ERR) {
throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
} else {
@@ -291,6 +312,7 @@ bool Connection::canEncode()
} else {
QPID_LOG(info, "Connection " << id << " has been closed locally");
}
+ if (ioRequested.valueCompareAndSwap(true, false)) haveOutput = true;
pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
@@ -303,8 +325,22 @@ void Connection::open()
pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
if (timeout) {
- ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this));
- pn_transport_set_idle_timeout(transport, timeout);
+ // if idle generate empty frames at 1/2 the timeout interval as keepalives:
+ ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask((timeout+1)/2,
+ getBroker().getTimer(),
+ *this));
+ getBroker().getTimer().add(ticker);
+
+ // Note: in version 0-10 of the protocol, idle timeout applies to both
+ // ends. AMQP 1.0 changes that - it's now asymmetric: each end can
+ // configure/disable it independently. For backward compatibility, by
+ // default mimic the old behavior and set our local timeout.
+ // Use 2x the remote's timeout, as per the spec the remote should
+ // advertise 1/2 its actual timeout threshold
+ pn_transport_set_idle_timeout(transport, timeout * 2);
+ QPID_LOG_CAT(debug, network, id << " AMQP 1.0 idle-timeout set:"
+ << " local=" << pn_transport_get_idle_timeout(transport)
+ << " remote=" << pn_transport_get_remote_idle_timeout(transport));
}
pn_connection_open(connection);
@@ -585,4 +621,22 @@ void Connection::doDeliveryUpdated(pn_delivery_t *delivery)
}
}
+// check for failures of the transport:
+bool Connection::checkTransportError(std::string& text)
+{
+ std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+ pn_condition_t* tcondition = pn_transport_condition(transport);
+ if (pn_condition_is_set(tcondition))
+ info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
+#else
+ pn_error_t* terror = pn_transport_error(transport);
+ if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]";
+#endif
+
+ text = info.str();
+ return !text.empty();
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index ea4ce06163..e97d041c03 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -24,6 +24,7 @@
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/broker/amqp/BrokerContext.h"
#include "qpid/broker/amqp/ManagedConnection.h"
+#include "qpid/sys/AtomicValue.h"
#include <map>
#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
@@ -80,6 +81,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
bool closeInitiated;
bool closeRequested;
boost::intrusive_ptr<sys::TimerTask> ticker;
+ qpid::sys::AtomicValue<bool> ioRequested;
virtual void process();
void doOutput(size_t);
@@ -92,6 +94,8 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void closedByManagement();
private:
+ bool checkTransportError(std::string&);
+
// handle Proton engine events
void doConnectionRemoteOpen();
void doConnectionRemoteClose();
@@ -100,7 +104,6 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void doLinkRemoteOpen(pn_link_t *link);
void doLinkRemoteClose(pn_link_t *link);
void doDeliveryUpdated(pn_delivery_t *delivery);
-
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index fedab4286f..0969f76ae4 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -39,6 +39,7 @@
#include "qpid/sys/SecurityLayer.h"
#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/urlAdd.h"
#include "config.h"
#include <boost/lexical_cast.hpp>
@@ -95,6 +96,27 @@ std::string get_error(pn_connection_t* connection, pn_transport_t* transport)
}
#endif
+class ConnectionTickerTask : public qpid::sys::TimerTask
+{
+ qpid::sys::Timer& timer;
+ ConnectionContext& connection;
+ public:
+ ConnectionTickerTask(const qpid::sys::Duration& interval, qpid::sys::Timer& t, ConnectionContext& c) :
+ TimerTask(interval, "ConnectionTicker"),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ QPID_LOG(debug, "ConnectionTickerTask fired");
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+
+ // Send Ticker
+ connection.activateOutput();
+ }
+};
}
void ConnectionContext::trace(const char* message) const
@@ -118,23 +140,15 @@ ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::
// Concatenate all known URLs into a single URL, get rid of duplicate addresses.
sys::urlAddStrings(fullUrl, urls.begin(), urls.end(), protocol.empty() ?
qpid::Address::TCP : protocol);
- if (pn_transport_bind(engine, connection)) {
- //error
- }
if (identifier.empty()) {
identifier = qpid::types::Uuid(true).str();
}
- pn_connection_set_container(connection, identifier.c_str());
- bool enableTrace(false);
- QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
- if (enableTrace) {
- pn_transport_trace(engine, PN_TRACE_FRM);
- set_tracer(engine, this);
- }
+ configureConnection();
}
ConnectionContext::~ConnectionContext()
{
+ if (ticker) ticker->cancel();
close();
sessions.clear();
pn_transport_free(engine);
@@ -218,6 +232,10 @@ void ConnectionContext::close()
lock.wait();
}
}
+ if (ticker) {
+ ticker->cancel();
+ ticker.reset();
+ }
}
bool ConnectionContext::fetch(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
@@ -498,7 +516,7 @@ uint32_t ConnectionContext::getUnsettled(boost::shared_ptr<ReceiverContext> rece
void ConnectionContext::activateOutput()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- wakeupDriver();
+ if (state == CONNECTED) wakeupDriver();
}
/**
* Expects lock to be held by caller
@@ -530,14 +548,11 @@ void ConnectionContext::reset()
engine = pn_transport();
connection = pn_connection();
- pn_connection_set_container(connection, identifier.c_str());
- bool enableTrace(false);
- QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
- if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+ configureConnection();
+
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->reset(connection);
}
- pn_transport_bind(engine, connection);
}
void ConnectionContext::check() {
@@ -758,12 +773,23 @@ std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
//TODO: Fix pn_engine_input() to take const buffer
ssize_t n = pn_transport_input(engine, const_cast<char*>(buffer), size);
if (n > 0 || n == PN_EOS) {
- //If engine returns EOS, have no way of knowing how many bytes
- //it processed, but can assume none need to be reprocessed so
- //consider them all read:
- if (n == PN_EOS) n = size;
+ // PN_EOS either means we received a Close (which also means we've
+ // consumed all the input), OR some Very Bad Thing happened and this
+ // connection is toast.
+ if (n == PN_EOS)
+ {
+ std::string error;
+ if (checkTransportError(error)) {
+ // "He's dead, Jim."
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ transport->abort();
+ return 0;
+ } else {
+ n = size; // assume all consumed
+ }
+ }
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size)
- pn_transport_tick(engine, 0);
+ pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
lock.notifyAll();
return n;
} else if (n == PN_ERR) {
@@ -795,7 +821,13 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
throw MessagingException(QPID_MSG("Error on output: " << getError()));
} else if (n == PN_EOS) {
haveOutput = false;
- return 0;//Is this right?
+ // Normal close, or error?
+ std::string error;
+ if (checkTransportError(error)) {
+ QPID_LOG_CAT(error, network, id << " connection failed: " << error);
+ transport->abort();
+ }
+ return 0;
} else {
haveOutput = false;
return 0;
@@ -804,6 +836,7 @@ std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
bool ConnectionContext::canEncodePlain()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ pn_transport_tick(engine, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
return haveOutput && state == CONNECTED;
}
void ConnectionContext::closed()
@@ -1061,7 +1094,6 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
}
QPID_LOG(debug, id << " Opening...");
- setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
@@ -1071,6 +1103,25 @@ bool ConnectionContext::tryOpenAddr(const qpid::Address& addr) {
if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
throw qpid::messaging::ConnectionError("Failed to open connection");
}
+
+ // Connection open - check for idle timeout from the remote and start a
+ // periodic tick to monitor for idle connections
+ pn_timestamp_t remote = pn_transport_get_remote_idle_timeout(engine);
+ pn_timestamp_t local = pn_transport_get_idle_timeout(engine);
+ uint64_t shortest = ((remote && local)
+ ? std::min(remote, local)
+ : (remote) ? remote : local);
+ if (shortest) {
+ // send an idle frame at least twice before timeout
+ shortest = (shortest + 1)/2;
+ qpid::sys::Duration d(shortest * qpid::sys::TIME_MSEC);
+ ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(d, driver->getTimer(), *this));
+ driver->getTimer().add(ticker);
+ QPID_LOG(debug, id << " AMQP 1.0 idle-timeout set:"
+ << " local=" << pn_transport_get_idle_timeout(engine)
+ << " remote=" << pn_transport_get_remote_idle_timeout(engine));
+ }
+
QPID_LOG(debug, id << " Opened");
return restartSessions();
@@ -1151,4 +1202,44 @@ bool ConnectionContext::CodecAdapter::canEncode()
}
+// setup the transport and connection objects:
+void ConnectionContext::configureConnection()
+{
+ pn_connection_set_container(connection, identifier.c_str());
+ setProperties();
+ if (heartbeat) {
+ // fail an idle connection at 2 x heartbeat (in msecs)
+ pn_transport_set_idle_timeout(engine, heartbeat*2*1000);
+ }
+
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) {
+ pn_transport_trace(engine, PN_TRACE_FRM);
+ set_tracer(engine, this);
+ }
+
+ int err = pn_transport_bind(engine, connection);
+ if (err)
+ QPID_LOG(error, id << " Error binding connection and transport: " << err);
+}
+
+
+// check for failures of the transport:
+bool ConnectionContext::checkTransportError(std::string& text)
+{
+ std::stringstream info;
+
+#ifdef USE_PROTON_TRANSPORT_CONDITION
+ pn_condition_t* tcondition = pn_transport_condition(engine);
+ if (pn_condition_is_set(tcondition))
+ info << "transport error: " << pn_condition_get_name(tcondition) << ", " << pn_condition_get_description(tcondition);
+#else
+ pn_error_t* terror = pn_transport_error(engine);
+ if (terror) info << "transport error " << pn_error_text(terror) << " [" << terror << "]";
+#endif
+
+ text = info.str();
+ return !text.empty();
+}
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 12f3c21e0a..80da9dff10 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -25,6 +25,7 @@
#include <map>
#include <memory>
#include <string>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include "qpid/Url.h"
#include "qpid/messaging/ConnectionOptions.h"
@@ -47,6 +48,7 @@ class ProtocolVersion;
namespace sys {
class SecurityLayer;
struct SecuritySettings;
+class TimerTask;
}
namespace messaging {
class Duration;
@@ -120,7 +122,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void initSecurityLayer(qpid::sys::SecurityLayer&);
void trace(const char*) const;
- private:
+ private:
typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
class CodecAdapter : public qpid::sys::Codec
{
@@ -155,6 +157,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::auto_ptr<Sasl> sasl;
CodecAdapter codecAdapter;
bool notifyOnWrite;
+ boost::intrusive_ptr<qpid::sys::TimerTask> ticker;
void check();
bool checkDisconnected();
@@ -191,6 +194,8 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::string getError();
bool useSasl();
void setProperties();
+ void configureConnection();
+ bool checkTransportError(std::string&);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
index 16307b3c22..ebe3fff1cb 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.cpp
@@ -22,13 +22,14 @@
#include "Transport.h"
#include "qpid/messaging/exceptions.h"
#include "qpid/sys/Poller.h"
+#include "qpid/sys/Timer.h"
#include "qpid/log/Statement.h"
namespace qpid {
namespace messaging {
namespace amqp {
-DriverImpl::DriverImpl() : poller(new qpid::sys::Poller)
+DriverImpl::DriverImpl() : poller(new qpid::sys::Poller), timer(new qpid::sys::Timer)
{
start();
}
@@ -48,6 +49,7 @@ void DriverImpl::stop()
QPID_LOG(debug, "Driver stopped");
poller->shutdown();
thread.join();
+ timer->stop();
}
boost::shared_ptr<Transport> DriverImpl::getTransport(const std::string& protocol, TransportContext& connection)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
index 354fa1ae35..36cb196343 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/DriverImpl.h
@@ -29,6 +29,7 @@
namespace qpid {
namespace sys {
class Poller;
+class Timer;
}
namespace messaging {
namespace amqp {
@@ -47,11 +48,14 @@ class DriverImpl
void stop();
boost::shared_ptr<Transport> getTransport(const std::string& protocol, TransportContext& connection);
+ sys::Timer& getTimer() { return *timer; }
static boost::shared_ptr<DriverImpl> getDefault();
private:
boost::shared_ptr<qpid::sys::Poller> poller;
qpid::sys::Thread thread;
+ std::auto_ptr<sys::Timer> timer;
+
static qpid::sys::Mutex defaultLock;
static boost::weak_ptr<DriverImpl> theDefault;
};
diff --git a/qpid/cpp/src/qpid/messaging/amqp/Transport.h b/qpid/cpp/src/qpid/messaging/amqp/Transport.h
index 159916f9ae..6ec99ab58f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/Transport.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/Transport.h
@@ -40,6 +40,7 @@ class Transport : public qpid::sys::OutputControl
virtual ~Transport() {}
virtual void connect(const std::string& host, const std::string& port) = 0;
virtual void close() = 0;
+ virtual void abort() = 0;
virtual const qpid::sys::SecuritySettings* getSecuritySettings() = 0;
typedef Transport* Factory(TransportContext&, boost::shared_ptr<qpid::sys::Poller>);
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 08a8c69d69..c914c50e33 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -364,6 +364,7 @@ add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
if (BUILD_AMQP)
add_test (interlink_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
+ add_test (idle_timeout_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/idle_timeout_tests.py)
endif (BUILD_AMQP)
add_test (swig_python_tests ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/swig_python_tests${test_script_suffix})
add_test (ipv6_test ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
diff --git a/qpid/cpp/src/tests/idle_timeout_tests.py b/qpid/cpp/src/tests/idle_timeout_tests.py
new file mode 100755
index 0000000000..c3cc00746b
--- /dev/null
+++ b/qpid/cpp/src/tests/idle_timeout_tests.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import shutil
+import signal
+import sys
+
+from brokertest import *
+
+class AmqpIdleTimeoutTest(BrokerTest):
+ """
+ Test AMQP 1.0 idle-timeout support
+ """
+ def setUp(self):
+ BrokerTest.setUp(self)
+ if not BrokerTest.amqp_lib:
+ raise Skipped("AMQP 1.0 library not found")
+ if qm != qpid_messaging:
+ raise Skipped("AMQP 1.0 client not found")
+ self._broker = self.broker()
+
+ def test_client_timeout(self):
+ """Ensure that the client disconnects should the broker stop
+ responding.
+ """
+ conn = self._broker.connect(native=False, timeout=None,
+ protocol="amqp1.0", heartbeat=1)
+ self.assertTrue(conn.isOpen())
+ # should disconnect within 2 seconds of broker stop
+ deadline = time.time() + 8
+ os.kill(self._broker.pid, signal.SIGSTOP)
+ while time.time() < deadline:
+ if not conn.isOpen():
+ break;
+ self.assertTrue(not conn.isOpen())
+ os.kill(self._broker.pid, signal.SIGCONT)
+
+
+ def test_broker_timeout(self):
+ """By default, the broker will adopt the same timeout as the client
+ (mimics the 0-10 timeout behavior). Verify the broker disconnects
+ unresponsive clients.
+ """
+
+ count = len(self._broker.agent.getAllConnections())
+
+ # Create a new connection to the broker:
+ receiver_cmd = ["qpid-receive",
+ "--broker", self._broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, heartbeat:1}",
+ "--forever"]
+ receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE,
+ expect=EXPECT_UNKNOWN)
+ start = time.time()
+ deadline = time.time() + 10
+ while time.time() < deadline:
+ if count < len(self._broker.agent.getAllConnections()):
+ break;
+ self.assertTrue(count < len(self._broker.agent.getAllConnections()))
+
+ # now 'hang' the client, the broker should disconnect
+ start = time.time()
+ receiver.send_signal(signal.SIGSTOP)
+ deadline = time.time() + 10
+ while time.time() < deadline:
+ if count == len(self._broker.agent.getAllConnections()):
+ break;
+ self.assertEqual(count, len(self._broker.agent.getAllConnections()))
+ receiver.send_signal(signal.SIGCONT)
+ receiver.teardown()
+
+
+if __name__ == "__main__":
+ shutil.rmtree("brokertest.tmp", True)
+ os.execvp("qpid-python-test",
+ ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:])