#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import os import shutil import signal import sys from brokertest import * class AmqpIdleTimeoutTest(BrokerTest): """ Test AMQP 1.0 idle-timeout support """ def setUp(self): BrokerTest.setUp(self) if not BrokerTest.amqp_lib: raise Skipped("AMQP 1.0 library not found") if qm != qpid_messaging: raise Skipped("AMQP 1.0 client not found") self._broker = self.broker() def test_client_timeout(self): """Ensure that the client disconnects should the broker stop responding. """ conn = self._broker.connect(native=False, timeout=None, protocol="amqp1.0", heartbeat=1) self.assertTrue(conn.isOpen()) # should disconnect within 2 seconds of broker stop deadline = time.time() + 8 os.kill(self._broker.pid, signal.SIGSTOP) while time.time() < deadline: if not conn.isOpen(): break; self.assertTrue(not conn.isOpen()) os.kill(self._broker.pid, signal.SIGCONT) def test_broker_timeout(self): """By default, the broker will adopt the same timeout as the client (mimics the 0-10 timeout behavior). Verify the broker disconnects unresponsive clients. """ count = len(self._broker.agent.getAllConnections()) # Create a new connection to the broker: receiver_cmd = ["qpid-receive", "--broker", self._broker.host_port(), "--address=amq.fanout", "--connection-options={protocol:amqp1.0, heartbeat:1}", "--forever"] receiver = self.popen(receiver_cmd, stdout=PIPE, stderr=PIPE, expect=EXPECT_UNKNOWN) start = time.time() deadline = time.time() + 10 while time.time() < deadline: if count < len(self._broker.agent.getAllConnections()): break; self.assertTrue(count < len(self._broker.agent.getAllConnections())) # now 'hang' the client, the broker should disconnect start = time.time() os.kill(receiver.pid, signal.SIGSTOP) deadline = time.time() + 10 while time.time() < deadline: if count == len(self._broker.agent.getAllConnections()): break; self.assertEqual(count, len(self._broker.agent.getAllConnections())) os.kill(receiver.pid, signal.SIGCONT) receiver.teardown() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) os.execvp("qpid-python-test", ["qpid-python-test", "-m", "idle_timeout_tests"] + sys.argv[1:])