diff options
-rw-r--r-- | kafka/consumer.py | 7 | ||||
-rw-r--r-- | kafka/util.py | 3 |
2 files changed, 8 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..fe7881a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -59,7 +59,6 @@ class SimpleConsumer(object): if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, self._timed_commit) - self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -157,7 +156,7 @@ class SimpleConsumer(object): self.commit() # Once the commit is done, start the timer again - self.commit_timer.start() + # self.commit_timer.start() def commit(self, partitions=[]): """ @@ -224,6 +223,10 @@ class SimpleConsumer(object): if len(iters) == 0: break + # Now that we are consuming data, start the commit thread + if self.commit_timer and not self.commit_timer.is_active: + self.commit_timer.start() + for partition, it in iters.items(): try: yield it.next() diff --git a/kafka/util.py b/kafka/util.py index 10bf838..b3112d5 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -86,14 +86,17 @@ class ReentrantTimer(object): self.timer = None self.t = t self.fn = fn + self.is_active = False def start(self): if self.timer is not None: self.timer.cancel() self.timer = Timer(self.t / 1000., self.fn) + self.is_active = True self.timer.start() def stop(self): self.timer.cancel() self.timer = None + self.is_active = False |