diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-10 09:48:19 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-10 10:15:45 -0700 |
commit | f021609911d25dbb7ef20410890483c1439edd58 (patch) | |
tree | 1927501c0d8f64c6b8745661b9af953947b089d5 /kafka/consumer/base.py | |
parent | ecdcdf531d232a923f4869f421d3a908dd735d4a (diff) | |
download | kafka-python-f021609911d25dbb7ef20410890483c1439edd58.tar.gz |
Change Consumer commit() to return True/False and log error; dont raise client exceptions
Diffstat (limited to 'kafka/consumer/base.py')
-rw-r--r-- | kafka/consumer/base.py | 30 |
1 files changed, 18 insertions, 12 deletions
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 6365cfa..b5383a3 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -8,7 +8,7 @@ from threading import Lock import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError, check_error + UnknownTopicOrPartitionError, check_error, KafkaError ) from kafka.util import kafka_bytestring, ReentrantTimer @@ -114,12 +114,13 @@ class Consumer(object): self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): - """ - Commit offsets for this consumer + """Commit stored offsets to Kafka via OffsetCommitRequest (v0) Keyword Arguments: partitions (list): list of partitions to commit, default is to commit all of them + + Returns: True on success, False on failure """ # short circuit if nothing happened. This check is kept outside @@ -135,22 +136,27 @@ class Consumer(object): reqs = [] if partitions is None: # commit all partitions - partitions = self.offsets.keys() + partitions = list(self.offsets.keys()) + log.info('Committing new offsets for %s, partitions %s', + self.topic, partitions) for partition in partitions: offset = self.offsets[partition] - log.debug("Commit offset %d in SimpleConsumer: " - "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + log.debug('Commit offset %d in SimpleConsumer: ' + 'group=%s, topic=%s, partition=%s', + offset, self.group, self.topic, partition) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) - resps = self.client.send_offset_commit_request(self.group, reqs) - for resp in resps: - kafka.common.check_error(resp) - - self.count_since_commit = 0 + try: + self.client.send_offset_commit_request(self.group, reqs) + except KafkaError as e: + log.error('%s saving offsets: %s', e.__class__.__name__, e) + return False + else: + self.count_since_commit = 0 + return True def _auto_commit(self): """ |