From 957c62d6ded7a3652e7897db20a23e070a6ad852 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 6 Mar 2019 06:44:25 -0800 Subject: Move all network connection IO into KafkaClient.poll() --- test/fixtures.py | 7 ++++--- test/test_client_async.py | 9 ++++----- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'test') diff --git a/test/fixtures.py b/test/fixtures.py index 34373e6..8b156e6 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -405,10 +405,11 @@ class KafkaFixture(Fixture): retries = 10 while True: node_id = self._client.least_loaded_node() - for ready_retry in range(40): - if self._client.ready(node_id, False): + for connect_retry in range(40): + self._client.maybe_connect(node_id) + if self._client.connected(node_id): break - time.sleep(.1) + self._client.poll(timeout_ms=100) else: raise RuntimeError('Could not connect to broker with node id %d' % (node_id,)) diff --git a/test/test_client_async.py b/test/test_client_async.py index 09781ac..1c8a50f 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -125,8 +125,7 @@ def test_conn_state_change(mocker, cli, conn): conn.state = ConnectionStates.CONNECTED cli._conn_state_change(node_id, conn) assert node_id not in cli._connecting - sel.unregister.assert_called_with(conn._sock) - sel.register.assert_called_with(conn._sock, selectors.EVENT_READ, conn) + sel.modify.assert_called_with(conn._sock, selectors.EVENT_READ, conn) # Failure to connect should trigger metadata update assert cli.cluster._need_update is False @@ -145,7 +144,7 @@ def test_conn_state_change(mocker, cli, conn): def test_ready(mocker, cli, conn): - maybe_connect = mocker.patch.object(cli, '_maybe_connect') + maybe_connect = mocker.patch.object(cli, 'maybe_connect') node_id = 1 cli.ready(node_id) maybe_connect.assert_called_with(node_id) @@ -362,6 +361,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) + mocker.patch.object(client, 'maybe_connect', return_value=True) now = time.time() t = mocker.patch('time.time') @@ -370,8 +370,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) client._poll.assert_called_with(2.222) # reconnect backoff - client._can_connect.assert_called_once_with('foobar') - client._maybe_connect.assert_called_once_with('foobar') + client.maybe_connect.assert_called_once_with('foobar') # poll while connecting should not attempt a new connection client._connecting.add('foobar') -- cgit v1.2.1