summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-10 13:19:06 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-10 13:19:06 -0700
commit4dec5d31a9ce10670daa57fff1f4730a2b0378ee (patch)
treec0234bbe73df1f5ef0fcacae1fcd633cbebac301
parent04cf4b36686131af392f401a32be7bf5567fb7c2 (diff)
parented42d7899117e4bba8ef47afe825c13185cbdfc7 (diff)
downloadkafka-python-4dec5d31a9ce10670daa57fff1f4730a2b0378ee.tar.gz
Merge pull request #404 from dpkp/consumer_more_exception_handling
Consumer - handle exceptions in commit() and reset_partition_offset()
-rw-r--r--kafka/consumer/base.py30
-rw-r--r--kafka/consumer/simple.py24
-rw-r--r--test/test_consumer.py44
3 files changed, 79 insertions, 19 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 6365cfa..b5383a3 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -8,7 +8,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- UnknownTopicOrPartitionError, check_error
+ UnknownTopicOrPartitionError, check_error, KafkaError
)
from kafka.util import kafka_bytestring, ReentrantTimer
@@ -114,12 +114,13 @@ class Consumer(object):
self.offsets[resp.partition] = resp.offset
def commit(self, partitions=None):
- """
- Commit offsets for this consumer
+ """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them
+
+ Returns: True on success, False on failure
"""
# short circuit if nothing happened. This check is kept outside
@@ -135,22 +136,27 @@ class Consumer(object):
reqs = []
if partitions is None: # commit all partitions
- partitions = self.offsets.keys()
+ partitions = list(self.offsets.keys())
+ log.info('Committing new offsets for %s, partitions %s',
+ self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: "
- "group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
+ log.debug('Commit offset %d in SimpleConsumer: '
+ 'group=%s, topic=%s, partition=%s',
+ offset, self.group, self.topic, partition)
reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))
- resps = self.client.send_offset_commit_request(self.group, reqs)
- for resp in resps:
- kafka.common.check_error(resp)
-
- self.count_since_commit = 0
+ try:
+ self.client.send_offset_commit_request(self.group, reqs)
+ except KafkaError as e:
+ log.error('%s saving offsets: %s', e.__class__.__name__, e)
+ return False
+ else:
+ self.count_since_commit = 0
+ return True
def _auto_commit(self):
"""
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index e4233ff..c75e78b 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -27,7 +27,7 @@ from .base import (
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
- FetchRequest, OffsetRequest,
+ FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
@@ -144,6 +144,13 @@ class SimpleConsumer(Consumer):
(self.group, self.topic, str(self.offsets.keys()))
def reset_partition_offset(self, partition):
+ """Update offsets using auto_offset_reset policy (smallest|largest)
+
+ Arguments:
+ partition (int): the partition for which offsets should be updated
+
+ Returns: Updated offset on success, None on failure
+ """
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
@@ -163,10 +170,17 @@ class SimpleConsumer(Consumer):
raise
# send_offset_request
- (resp, ) = self.client.send_offset_request(reqs)
- check_error(resp)
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
+ log.info('Resetting topic-partition offset to %s for %s:%d',
+ self.auto_offset_reset, self.topic, partition)
+ try:
+ (resp, ) = self.client.send_offset_request(reqs)
+ except KafkaError as e:
+ log.error('%s sending offset request for %s:%d',
+ e.__class__.__name__, self.topic, partition)
+ else:
+ self.offsets[partition] = resp.offsets[0]
+ self.fetch_offsets[partition] = resp.offsets[0]
+ return resp.offsets[0]
def provide_partition_info(self):
"""
diff --git a/test/test_consumer.py b/test/test_consumer.py
index 08fd620..df15115 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -4,7 +4,7 @@ from . import unittest
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
- KafkaConfigurationError, FetchResponse,
+ KafkaConfigurationError, FetchResponse, OffsetFetchResponse,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
@@ -25,10 +25,11 @@ class TestMultiProcessConsumer(unittest.TestCase):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
+ MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
+class TestSimpleConsumer(unittest.TestCase):
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
@@ -80,6 +81,45 @@ class TestMultiProcessConsumer(unittest.TestCase):
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)
+ def test_simple_consumer_commit_does_not_raise(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+
+ def mock_offset_fetch_request(group, payloads, **kwargs):
+ return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads]
+
+ client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
+
+ def mock_offset_commit_request(group, payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_commit_request.side_effect = mock_offset_commit_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # Mock internal commit check
+ consumer.count_since_commit = 10
+
+ # This should not raise an exception
+ self.assertFalse(consumer.commit(partitions=[0, 1]))
+
+ def test_simple_consumer_reset_partition_offset(self):
+ client = MagicMock()
+
+ def mock_offset_request(payloads, **kwargs):
+ raise FailedPayloadsError(payloads[0])
+
+ client.send_offset_request.side_effect = mock_offset_request
+
+ consumer = SimpleConsumer(client, group='foobar',
+ topic='topic', partitions=[0, 1],
+ auto_commit=False)
+
+ # This should not raise an exception
+ self.assertEqual(consumer.reset_partition_offset(0), None)
+
@staticmethod
def fail_requests_factory(error_factory):
# Mock so that only the first request gets a valid response