diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index c9f12e1..fe7881a 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -59,7 +59,6 @@ class SimpleConsumer(object): if auto_commit is True and auto_commit_every_t is not None: self.commit_timer = ReentrantTimer(auto_commit_every_t, self._timed_commit) - self.commit_timer.start() def get_or_init_offset_callback(resp): if resp.error == ErrorMapping.NO_ERROR: @@ -157,7 +156,7 @@ class SimpleConsumer(object): self.commit() # Once the commit is done, start the timer again - self.commit_timer.start() + # self.commit_timer.start() def commit(self, partitions=[]): """ @@ -224,6 +223,10 @@ class SimpleConsumer(object): if len(iters) == 0: break + # Now that we are consuming data, start the commit thread + if self.commit_timer and not self.commit_timer.is_active: + self.commit_timer.start() + for partition, it in iters.items(): try: yield it.next() |