diff options
Diffstat (limited to 'test/test_coordinator.py')
-rw-r--r-- | test/test_coordinator.py | 100 |
1 files changed, 61 insertions, 39 deletions
diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 0e96110..7dc0e04 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -10,6 +10,7 @@ from kafka.consumer.subscription_state import ( SubscriptionState, ConsumerRebalanceListener) from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor +from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator from kafka.coordinator.protocol import ( ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment) @@ -43,13 +44,13 @@ def test_autocommit_enable_api_version(client, api_version): coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), enable_auto_commit=True, + session_timeout_ms=30000, # session_timeout_ms and max_poll_interval_ms + max_poll_interval_ms=30000, # should be the same to avoid KafkaConfigurationError group_id='foobar', api_version=api_version) if api_version < (0, 8, 1): - assert coordinator._auto_commit_task is None assert coordinator.config['enable_auto_commit'] is False else: - assert coordinator._auto_commit_task is not None assert coordinator.config['enable_auto_commit'] is True @@ -269,19 +270,19 @@ def test_close(mocker, coordinator): mocker.patch.object(coordinator, '_handle_leave_group_response') mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 - coordinator.generation = 1 + coordinator._generation = Generation(1, 'foobar', b'') + coordinator.state = MemberState.STABLE cli = coordinator._client - mocker.patch.object(cli, 'unschedule') mocker.patch.object(cli, 'send', return_value=Future().success('foobar')) mocker.patch.object(cli, 'poll') coordinator.close() assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 - cli.unschedule.assert_called_with(coordinator.heartbeat_task) coordinator._handle_leave_group_response.assert_called_with('foobar') - assert coordinator.generation == -1 - assert coordinator.member_id == '' + assert coordinator.generation() is None + assert coordinator._generation is Generation.NO_GENERATION + assert coordinator.state is MemberState.UNJOINED assert coordinator.rejoin_needed is True @@ -296,6 +297,7 @@ def offsets(): def test_commit_offsets_async(mocker, coordinator, offsets): mocker.patch.object(coordinator._client, 'poll') mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) + mocker.patch.object(coordinator, 'ensure_coordinator_ready') mocker.patch.object(coordinator, '_send_offset_commit_request', return_value=Future().success('fizzbuzz')) coordinator.commit_offsets_async(offsets) @@ -362,19 +364,21 @@ def test_maybe_auto_commit_offsets_sync(mocker, api_version, group_id, enable, coordinator = ConsumerCoordinator(client, SubscriptionState(), Metrics(), api_version=api_version, + session_timeout_ms=30000, + max_poll_interval_ms=30000, enable_auto_commit=enable, group_id=group_id) commit_sync = mocker.patch.object(coordinator, 'commit_offsets_sync', side_effect=error) if has_auto_commit: - assert coordinator._auto_commit_task is not None + assert coordinator.next_auto_commit_deadline is not None else: - assert coordinator._auto_commit_task is None + assert coordinator.next_auto_commit_deadline is None assert coordinator._maybe_auto_commit_offsets_sync() is None if has_auto_commit: - assert coordinator._auto_commit_task is not None + assert coordinator.next_auto_commit_deadline is not None assert commit_sync.call_count == (1 if commit_offsets else 0) assert mock_warn.call_count == (1 if warn else 0) @@ -387,24 +391,25 @@ def patched_coord(mocker, coordinator): coordinator._subscription.needs_partition_assignment = False mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False) coordinator.coordinator_id = 0 - coordinator.generation = 0 + mocker.patch.object(coordinator, 'coordinator', return_value=0) + coordinator._generation = Generation(0, 'foobar', b'') + coordinator.state = MemberState.STABLE + coordinator.rejoin_needed = False mocker.patch.object(coordinator, 'need_rejoin', return_value=False) mocker.patch.object(coordinator._client, 'least_loaded_node', return_value=1) mocker.patch.object(coordinator._client, 'ready', return_value=True) mocker.patch.object(coordinator._client, 'send') - mocker.patch.object(coordinator._client, 'schedule') mocker.spy(coordinator, '_failed_request') mocker.spy(coordinator, '_handle_offset_commit_response') mocker.spy(coordinator, '_handle_offset_fetch_response') - mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success') - mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure') return coordinator -def test_send_offset_commit_request_fail(patched_coord, offsets): +def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): patched_coord.coordinator_unknown.return_value = True patched_coord.coordinator_id = None + patched_coord.coordinator.return_value = None # No offsets ret = patched_coord._send_offset_commit_request({}) @@ -488,7 +493,14 @@ def test_handle_offset_commit_response(mocker, patched_coord, offsets, response) assert isinstance(future.exception, error) assert patched_coord.coordinator_id is (None if dead else 0) - assert patched_coord._subscription.needs_partition_assignment is reassign + if reassign: + assert patched_coord._generation is Generation.NO_GENERATION + assert patched_coord.rejoin_needed is True + assert patched_coord.state is MemberState.UNJOINED + else: + assert patched_coord._generation is not Generation.NO_GENERATION + assert patched_coord.rejoin_needed is False + assert patched_coord.state is MemberState.STABLE @pytest.fixture @@ -496,9 +508,10 @@ def partitions(): return [TopicPartition('foobar', 0), TopicPartition('foobar', 1)] -def test_send_offset_fetch_request_fail(patched_coord, partitions): +def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): patched_coord.coordinator_unknown.return_value = True patched_coord.coordinator_id = None + patched_coord.coordinator.return_value = None # No partitions ret = patched_coord._send_offset_fetch_request([]) @@ -551,28 +564,18 @@ def test_send_offset_fetch_request_success(patched_coord, partitions): future, response) -@pytest.mark.parametrize('response,error,dead,reassign', [ - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 30), (1, 234, b'', 30)])]), - # Errors.GroupAuthorizationFailedError, False, False), - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 7), (1, 234, b'', 7)])]), - # Errors.RequestTimedOutError, True, False), - #(OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 27), (1, 234, b'', 27)])]), - # Errors.RebalanceInProgressError, False, True), +@pytest.mark.parametrize('response,error,dead', [ (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 14), (1, 234, b'', 14)])]), - Errors.GroupLoadInProgressError, False, False), + Errors.GroupLoadInProgressError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 16), (1, 234, b'', 16)])]), - Errors.NotCoordinatorForGroupError, True, False), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 25), (1, 234, b'', 25)])]), - Errors.UnknownMemberIdError, False, True), - (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 22), (1, 234, b'', 22)])]), - Errors.IllegalGenerationError, False, True), + Errors.NotCoordinatorForGroupError, True), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 29), (1, 234, b'', 29)])]), - Errors.TopicAuthorizationFailedError, False, False), + Errors.TopicAuthorizationFailedError, False), (OffsetFetchResponse[0]([('foobar', [(0, 123, b'', 0), (1, 234, b'', 0)])]), - None, False, False), + None, False), ]) def test_handle_offset_fetch_response(patched_coord, offsets, - response, error, dead, reassign): + response, error, dead): future = Future() patched_coord._handle_offset_fetch_response(future, response) if error is not None: @@ -581,15 +584,34 @@ def test_handle_offset_fetch_response(patched_coord, offsets, assert future.succeeded() assert future.value == offsets assert patched_coord.coordinator_id is (None if dead else 0) - assert patched_coord._subscription.needs_partition_assignment is reassign -def test_heartbeat(patched_coord): - patched_coord.coordinator_unknown.return_value = True +def test_heartbeat(mocker, patched_coord): + heartbeat = HeartbeatThread(patched_coord) + + assert not heartbeat.enabled and not heartbeat.closed + + heartbeat.enable() + assert heartbeat.enabled + + heartbeat.disable() + assert not heartbeat.enabled + + # heartbeat disables when un-joined + heartbeat.enable() + patched_coord.state = MemberState.UNJOINED + heartbeat._run_once() + assert not heartbeat.enabled + + heartbeat.enable() + patched_coord.state = MemberState.STABLE + mocker.spy(patched_coord, '_send_heartbeat_request') + mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) + heartbeat._run_once() + assert patched_coord._send_heartbeat_request.call_count == 1 - patched_coord.heartbeat_task() - assert patched_coord._client.schedule.call_count == 1 - assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1 + heartbeat.close() + assert heartbeat.closed def test_lookup_coordinator_failure(mocker, coordinator): |