diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-11 14:44:45 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-11 14:44:45 +0530 |
commit | a4601d3a1bf6792e0d57e600f48e891ef2be1528 (patch) | |
tree | df99626300f685359e0c5ec9569c978464278366 | |
parent | 77b8301e253774e09d13ff6b7c132fd51e6d9091 (diff) | |
download | kafka-python-a4601d3a1bf6792e0d57e600f48e891ef2be1528.tar.gz |
Spawn the commit thread only if necessary
If there are no messages being consumed, the timer keeps
creating new threads at the specified intervals. This may
not be necessary. We can control this behaviour such that
the timer thread is started only when a message is consumed
-rw-r--r-- | kafka/consumer.py | 7 | ||||
-rw-r--r-- | kafka/util.py | 3 |
2 files changed, 8 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..fe7881a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -59,7 +59,6 @@ class SimpleConsumer(object): if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, self._timed_commit) - self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -157,7 +156,7 @@ class SimpleConsumer(object): self.commit() # Once the commit is done, start the timer again - self.commit_timer.start() + # self.commit_timer.start() def commit(self, partitions=[]): """ @@ -224,6 +223,10 @@ class SimpleConsumer(object): if len(iters) == 0: break + # Now that we are consuming data, start the commit thread + if self.commit_timer and not self.commit_timer.is_active: + self.commit_timer.start() + for partition, it in iters.items(): try: yield it.next() diff --git a/kafka/util.py b/kafka/util.py index 10bf838..b3112d5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -86,14 +86,17 @@ class ReentrantTimer(object): self.timer = None self.t = t self.fn = fn + self.is_active = False def start(self): if self.timer is not None: self.timer.cancel() self.timer = Timer(self.t / 1000., self.fn) + self.is_active = True self.timer.start() def stop(self): self.timer.cancel() self.timer = None + self.is_active = False |