summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMehdi Abaakouk <sileht@redhat.com>2016-12-08 11:31:22 +0100
committerMehdi Abaakouk <sileht@redhat.com>2017-01-02 11:46:48 +0100
commit1ee3d7001aa07904eb664b989a879b73f3310d99 (patch)
tree8af602239fdf8aa50b55044f6f3109f91adc8162
parent488594936a52145c778c89fc88adca722ae8bd72 (diff)
downloadoslo-messaging-1ee3d7001aa07904eb664b989a879b73f3310d99.tar.gz
kafka: remove no really implemented feature
Change-Id: I6f2693c48d5d0ac1af68b3d4bb5ff361facef977
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py28
1 files changed, 4 insertions, 24 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 59d2ff7..15f7c3d 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -129,11 +129,6 @@ class Connection(object):
self.group_id = driver_conf.consumer_group
self.url = url
self._parse_url()
- # TODO(Support for manual/auto_commit functionality)
- # When auto_commit is False, consumer can manually notify
- # the completion of the subscription.
- # Currently we don't support for non auto commit option
- self.auto_commit = True
self._consume_loop_stopped = False
def _parse_url(self):
@@ -229,17 +224,6 @@ class Connection(object):
self.consumer.close()
self.consumer = None
- def commit(self):
- """Commit is used by subscribers belonging to the same group.
- After subscribing messages, commit is called to prevent
- the other subscribers which belong to the same group
- from re-subscribing the same messages.
-
- Currently self.auto_commit option is always True,
- so we don't need to call this function.
- """
- self.consumer.commit()
-
def _close_producer(self):
with self.producer_lock:
if self.producer:
@@ -260,6 +244,10 @@ class Connection(object):
@with_reconnect()
def declare_topic_consumer(self, topics, group=None):
+ # TODO(Support for manual/auto_commit functionality)
+ # When auto_commit is False, consumer can manually notify
+ # the completion of the subscription.
+ # Currently we don't support for non auto commit option
self.consumer = kafka.KafkaConsumer(
*topics, group_id=(group or self.group_id),
bootstrap_servers=self.hostaddrs,
@@ -308,14 +296,6 @@ class KafkaListener(base.PollStyleListener):
def cleanup(self):
self.conn.close()
- def commit(self):
- # TODO(Support for manually/auto commit functionality)
- # It's better to allow users to commit manually and support for
- # self.auto_commit = False option. For now, this commit function
- # is meaningless since user couldn't call this function and
- # auto_commit option is always True.
- self.conn.commit()
-
class KafkaDriver(base.BaseDriver):
"""Note: Current implementation of this driver is experimental.