summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..fe7881a 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -59,7 +59,6 @@ class SimpleConsumer(object):
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self._timed_commit)
- self.commit_timer.start()
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
@@ -157,7 +156,7 @@ class SimpleConsumer(object):
self.commit()
# Once the commit is done, start the timer again
- self.commit_timer.start()
+ # self.commit_timer.start()
def commit(self, partitions=[]):
"""
@@ -224,6 +223,10 @@ class SimpleConsumer(object):
if len(iters) == 0:
break
+ # Now that we are consuming data, start the commit thread
+ if self.commit_timer and not self.commit_timer.is_active:
+ self.commit_timer.start()
+
for partition, it in iters.items():
try:
yield it.next()