summaryrefslogtreecommitdiff
path: root/kafka/consumer/base.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-06-10 09:48:19 -0700
committerDana Powers <dana.powers@rd.io>2015-06-10 10:15:45 -0700
commitf021609911d25dbb7ef20410890483c1439edd58 (patch)
tree1927501c0d8f64c6b8745661b9af953947b089d5 /kafka/consumer/base.py
parentecdcdf531d232a923f4869f421d3a908dd735d4a (diff)
downloadkafka-python-f021609911d25dbb7ef20410890483c1439edd58.tar.gz
Change Consumer commit() to return True/False and log error; dont raise client exceptions
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r--kafka/consumer/base.py30
1 files changed, 18 insertions, 12 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 6365cfa..b5383a3 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -8,7 +8,7 @@ from threading import Lock
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
- UnknownTopicOrPartitionError, check_error
+ UnknownTopicOrPartitionError, check_error, KafkaError
)
from kafka.util import kafka_bytestring, ReentrantTimer
@@ -114,12 +114,13 @@ class Consumer(object):
self.offsets[resp.partition] = resp.offset
def commit(self, partitions=None):
- """
- Commit offsets for this consumer
+ """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them
+
+ Returns: True on success, False on failure
"""
# short circuit if nothing happened. This check is kept outside
@@ -135,22 +136,27 @@ class Consumer(object):
reqs = []
if partitions is None: # commit all partitions
- partitions = self.offsets.keys()
+ partitions = list(self.offsets.keys())
+ log.info('Committing new offsets for %s, partitions %s',
+ self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
- log.debug("Commit offset %d in SimpleConsumer: "
- "group=%s, topic=%s, partition=%s" %
- (offset, self.group, self.topic, partition))
+ log.debug('Commit offset %d in SimpleConsumer: '
+ 'group=%s, topic=%s, partition=%s',
+ offset, self.group, self.topic, partition)
reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))
- resps = self.client.send_offset_commit_request(self.group, reqs)
- for resp in resps:
- kafka.common.check_error(resp)
-
- self.count_since_commit = 0
+ try:
+ self.client.send_offset_commit_request(self.group, reqs)
+ except KafkaError as e:
+ log.error('%s saving offsets: %s', e.__class__.__name__, e)
+ return False
+ else:
+ self.count_since_commit = 0
+ return True
def _auto_commit(self):
"""