diff options
author | Dana Powers <dana.powers@gmail.com> | 2015-06-10 13:19:06 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2015-06-10 13:19:06 -0700 |
commit | 4dec5d31a9ce10670daa57fff1f4730a2b0378ee (patch) | |
tree | c0234bbe73df1f5ef0fcacae1fcd633cbebac301 | |
parent | 04cf4b36686131af392f401a32be7bf5567fb7c2 (diff) | |
parent | ed42d7899117e4bba8ef47afe825c13185cbdfc7 (diff) | |
download | kafka-python-4dec5d31a9ce10670daa57fff1f4730a2b0378ee.tar.gz |
Merge pull request #404 from dpkp/consumer_more_exception_handling
Consumer - handle exceptions in commit() and reset_partition_offset()
-rw-r--r-- | kafka/consumer/base.py | 30 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 24 | ||||
-rw-r--r-- | test/test_consumer.py | 44 |
3 files changed, 79 insertions, 19 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 6365cfa..b5383a3 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -8,7 +8,7 @@ from threading import Lock import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError, check_error + UnknownTopicOrPartitionError, check_error, KafkaError ) from kafka.util import kafka_bytestring, ReentrantTimer @@ -114,12 +114,13 @@ class Consumer(object): self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): - """ - Commit offsets for this consumer + """Commit stored offsets to Kafka via OffsetCommitRequest (v0) Keyword Arguments: partitions (list): list of partitions to commit, default is to commit all of them + + Returns: True on success, False on failure """ # short circuit if nothing happened. This check is kept outside @@ -135,22 +136,27 @@ class Consumer(object): reqs = [] if partitions is None: # commit all partitions - partitions = self.offsets.keys() + partitions = list(self.offsets.keys()) + log.info('Committing new offsets for %s, partitions %s', + self.topic, partitions) for partition in partitions: offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + log.debug('Commit offset %d in SimpleConsumer: ' + 'group=%s, topic=%s, partition=%s', + offset, self.group, self.topic, partition) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 + try: + self.client.send_offset_commit_request(self.group, reqs) + except KafkaError as e: + log.error('%s saving offsets: %s', e.__class__.__name__, e) + return False + else: + self.count_since_commit = 0 + return True def _auto_commit(self): """ diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff..c75e78b 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -27,7 +27,7 @@ from .base import ( NO_MESSAGES_WAIT_TIME_SECONDS ) from ..common import ( - FetchRequest, OffsetRequest, + FetchRequest, KafkaError, OffsetRequest, ConsumerFetchSizeTooSmall, ConsumerNoMoreData, UnknownTopicOrPartitionError, NotLeaderForPartitionError, OffsetOutOfRangeError, FailedPayloadsError, check_error @@ -144,6 +144,13 @@ class SimpleConsumer(Consumer): (self.group, self.topic, str(self.offsets.keys())) def reset_partition_offset(self, partition): + """Update offsets using auto_offset_reset policy (smallest|largest) + + Arguments: + partition (int): the partition for which offsets should be updated + + Returns: Updated offset on success, None on failure + """ LATEST = -1 EARLIEST = -2 if self.auto_offset_reset == 'largest': @@ -163,10 +170,17 @@ class SimpleConsumer(Consumer): raise # send_offset_request - (resp, ) = self.client.send_offset_request(reqs) - check_error(resp) - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] + log.info('Resetting topic-partition offset to %s for %s:%d', + self.auto_offset_reset, self.topic, partition) + try: + (resp, ) = self.client.send_offset_request(reqs) + except KafkaError as e: + log.error('%s sending offset request for %s:%d', + e.__class__.__name__, self.topic, partition) + else: + self.offsets[partition] = resp.offsets[0] + self.fetch_offsets[partition] = resp.offsets[0] + return resp.offsets[0] def provide_partition_info(self): """ diff --git a/test/test_consumer.py b/test/test_consumer.py index 08fd620..df15115 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -4,7 +4,7 @@ from . import unittest from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer from kafka.common import ( - KafkaConfigurationError, FetchResponse, + KafkaConfigurationError, FetchResponse, OffsetFetchResponse, FailedPayloadsError, OffsetAndMessage, NotLeaderForPartitionError, UnknownTopicOrPartitionError ) @@ -25,10 +25,11 @@ class TestMultiProcessConsumer(unittest.TestCase): client = MagicMock() partitions = (0,) with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets: - consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) + MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member +class TestSimpleConsumer(unittest.TestCase): def test_simple_consumer_failed_payloads(self): client = MagicMock() consumer = SimpleConsumer(client, group=None, @@ -80,6 +81,45 @@ class TestMultiProcessConsumer(unittest.TestCase): with self.assertRaises(UnknownTopicOrPartitionError): consumer.get_messages(20) + def test_simple_consumer_commit_does_not_raise(self): + client = MagicMock() + client.get_partition_ids_for_topic.return_value = [0, 1] + + def mock_offset_fetch_request(group, payloads, **kwargs): + return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads] + + client.send_offset_fetch_request.side_effect = mock_offset_fetch_request + + def mock_offset_commit_request(group, payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_commit_request.side_effect = mock_offset_commit_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # Mock internal commit check + consumer.count_since_commit = 10 + + # This should not raise an exception + self.assertFalse(consumer.commit(partitions=[0, 1])) + + def test_simple_consumer_reset_partition_offset(self): + client = MagicMock() + + def mock_offset_request(payloads, **kwargs): + raise FailedPayloadsError(payloads[0]) + + client.send_offset_request.side_effect = mock_offset_request + + consumer = SimpleConsumer(client, group='foobar', + topic='topic', partitions=[0, 1], + auto_commit=False) + + # This should not raise an exception + self.assertEqual(consumer.reset_partition_offset(0), None) + @staticmethod def fail_requests_factory(error_factory): # Mock so that only the first request gets a valid response |