summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-07 16:23:24 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-07 22:53:04 -0700
commit12c8f0cd3001d03e9bc2d6c01c9551ea22dec12a (patch)
tree96425c0caef56347d58e3ad93f627ff68f374bb9
parentbb40619cba0da4c90bb34707959f3eaca0bb56f0 (diff)
downloadkafka-python-12c8f0cd3001d03e9bc2d6c01c9551ea22dec12a.tar.gz
Make _wake_r socket non-blocking; drop select from _clear_wake_fd
-rw-r--r--kafka/client_async.py7
1 files changed, 4 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e921fa4..ca51987 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -98,6 +98,7 @@ class KafkaClient(object):
self._bootstrap_fails = 0
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = socket.socketpair()
+ self._wake_r.setblocking(False)
def __del__(self):
self._wake_r.close()
@@ -682,10 +683,10 @@ class KafkaClient(object):
def _clear_wake_fd(self):
while True:
- fds, _, _ = select.select([self._wake_r], [], [], 0)
- if not fds:
+ try:
+ self._wake_r.recv(1)
+ except:
break
- self._wake_r.recv(1)
class DelayedTaskQueue(object):