summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-12 19:01:20 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-12 19:01:20 -0800
commite42383dd84520b1087c0a64ae1017fe4b69eeef7 (patch)
tree9cef94dc5ff8315dcbb970f0eaf09e98f374f121 /test
parentccadb4dc8059865f9d7b0c4a65c5e480e65cd25f (diff)
parent561a678d1de1604262be43d47919fa68bdf17b17 (diff)
downloadkafka-python-e42383dd84520b1087c0a64ae1017fe4b69eeef7.tar.gz
Merge pull request #583 from dpkp/consumer_heartbeat_fixes
Fix Consumer Heartbeat Bugs
Diffstat (limited to 'test')
-rw-r--r--test/test_consumer_group.py27
-rw-r--r--test/test_coordinator.py16
2 files changed, 38 insertions, 5 deletions
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 6ef2020..3d10f8f 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -1,16 +1,17 @@
import collections
import logging
import threading
-import os
import time
import pytest
import six
-from kafka import SimpleClient, SimpleProducer
+from kafka import SimpleClient
from kafka.common import TopicPartition
-from kafka.conn import BrokerConnection, ConnectionStates
+from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
+from kafka.future import Future
+from kafka.protocol.metadata import MetadataResponse
from test.conftest import version
from test.testutil import random_string
@@ -115,3 +116,23 @@ def test_group(kafka_broker, topic):
finally:
for c in range(num_consumers):
stop[c].set()
+
+
+@pytest.fixture
+def conn(mocker):
+ conn = mocker.patch('kafka.client_async.BrokerConnection')
+ conn.return_value = conn
+ conn.state = ConnectionStates.CONNECTED
+ conn.send.return_value = Future().success(
+ MetadataResponse(
+ [(0, 'foo', 12), (1, 'bar', 34)], # brokers
+ [])) # topics
+ return conn
+
+
+def test_heartbeat_timeout(conn, mocker):
+ mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = '0.9')
+ mocker.patch('time.time', return_value = 1234)
+ consumer = KafkaConsumer('foobar')
+ mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
+ assert consumer._next_timeout() == 1234
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index 94e0e66..847cbc1 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -380,16 +380,20 @@ def test_maybe_auto_commit_offsets_sync(mocker, coordinator,
def patched_coord(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
coordinator._subscription.needs_partition_assignment = False
- mocker.patch.object(coordinator, 'coordinator_unknown')
- coordinator.coordinator_unknown.return_value = False
+ mocker.patch.object(coordinator, 'coordinator_unknown', return_value=False)
coordinator.coordinator_id = 0
+ coordinator.generation = 0
+ mocker.patch.object(coordinator, 'need_rejoin', return_value=False)
mocker.patch.object(coordinator._client, 'least_loaded_node',
return_value=1)
mocker.patch.object(coordinator._client, 'ready', return_value=True)
mocker.patch.object(coordinator._client, 'send')
+ mocker.patch.object(coordinator._client, 'schedule')
mocker.spy(coordinator, '_failed_request')
mocker.spy(coordinator, '_handle_offset_commit_response')
mocker.spy(coordinator, '_handle_offset_fetch_response')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_success')
+ mocker.spy(coordinator.heartbeat_task, '_handle_heartbeat_failure')
return coordinator
@@ -573,3 +577,11 @@ def test_handle_offset_fetch_response(patched_coord, offsets,
assert future.value == offsets
assert patched_coord.coordinator_id is (None if dead else 0)
assert patched_coord._subscription.needs_partition_assignment is reassign
+
+
+def test_heartbeat(patched_coord):
+ patched_coord.coordinator_unknown.return_value = True
+
+ patched_coord.heartbeat_task()
+ assert patched_coord._client.schedule.call_count == 1
+ assert patched_coord.heartbeat_task._handle_heartbeat_failure.call_count == 1