summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-11 14:44:45 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-11 14:44:45 +0530
commita4601d3a1bf6792e0d57e600f48e891ef2be1528 (patch)
treedf99626300f685359e0c5ec9569c978464278366
parent77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff)
downloadkafka-python-a4601d3a1bf6792e0d57e600f48e891ef2be1528.tar.gz
Spawn the commit thread only if necessary
If there are no messages being consumed, the timer keeps creating new threads at the specified intervals. This may not be necessary. We can control this behaviour such that the timer thread is started only when a message is consumed
-rw-r--r--kafka/consumer.py7
-rw-r--r--kafka/util.py3
2 files changed, 8 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index c9f12e1..fe7881a 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -59,7 +59,6 @@ class SimpleConsumer(object):
if auto_commit is True and auto_commit_every_t is not None:
self.commit_timer = ReentrantTimer(auto_commit_every_t,
self._timed_commit)
- self.commit_timer.start()
def get_or_init_offset_callback(resp):
if resp.error == ErrorMapping.NO_ERROR:
@@ -157,7 +156,7 @@ class SimpleConsumer(object):
self.commit()
# Once the commit is done, start the timer again
- self.commit_timer.start()
+ # self.commit_timer.start()
def commit(self, partitions=[]):
"""
@@ -224,6 +223,10 @@ class SimpleConsumer(object):
if len(iters) == 0:
break
+ # Now that we are consuming data, start the commit thread
+ if self.commit_timer and not self.commit_timer.is_active:
+ self.commit_timer.start()
+
for partition, it in iters.items():
try:
yield it.next()
diff --git a/kafka/util.py b/kafka/util.py
index 10bf838..b3112d5 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -86,14 +86,17 @@ class ReentrantTimer(object):
self.timer = None
self.t = t
self.fn = fn
+ self.is_active = False
def start(self):
if self.timer is not None:
self.timer.cancel()
self.timer = Timer(self.t / 1000., self.fn)
+ self.is_active = True
self.timer.start()
def stop(self):
self.timer.cancel()
self.timer = None
+ self.is_active = False