summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/consumer.py7
-rw-r--r--kafka/util.py3
2 files changed, 8 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..fe7881a 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -59,7 +59,6 @@ class SimpleConsumer(object):
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self._timed_commit)
- self.commit_timer.start()
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
@@ -157,7 +156,7 @@ class SimpleConsumer(object):
self.commit()
# Once the commit is done, start the timer again
- self.commit_timer.start()
+ # self.commit_timer.start()
def commit(self, partitions=[]):
"""
@@ -224,6 +223,10 @@ class SimpleConsumer(object):
if len(iters) == 0:
break
+ # Now that we are consuming data, start the commit thread
+ if self.commit_timer and not self.commit_timer.is_active:
+ self.commit_timer.start()
+
for partition, it in iters.items():
try:
yield it.next()
diff --git a/kafka/util.py b/kafka/util.py
index 10bf838..b3112d5 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -86,14 +86,17 @@ class ReentrantTimer(object):
self.timer = None
self.t = t
self.fn = fn
+ self.is_active = False
def start(self):
if self.timer is not None:
self.timer.cancel()
self.timer = Timer(self.t / 1000., self.fn)
+ self.is_active = True
self.timer.start()
def stop(self):
self.timer.cancel()
self.timer = None
+ self.is_active = False