diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-09-29 17:04:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-29 17:04:17 -0700 |
commit | 392d674be6641078717a4d87e471916c9a4bbb22 (patch) | |
tree | baee4165315b4c99939a944a6a7b893f0c17e36b /kafka/client_async.py | |
parent | 9de12d3f03236988a60e6cd79a50ffa5165cf735 (diff) | |
download | kafka-python-392d674be6641078717a4d87e471916c9a4bbb22.tar.gz |
Send socket data via non-blocking IO with send buffer (#1912)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 29 |
1 files changed, 27 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index ac2d364..9b9cb8f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -207,6 +207,7 @@ class KafkaClient(object): self._conns = Dict() # object to support weakrefs self._api_versions = None self._connecting = set() + self._sending = set() self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 @@ -532,6 +533,7 @@ class KafkaClient(object): # we will need to call send_pending_requests() # to trigger network I/O future = conn.send(request, blocking=False) + self._sending.add(conn) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding @@ -604,14 +606,23 @@ class KafkaClient(object): return responses + def _register_send_sockets(self): + while self._sending: + conn = self._sending.pop() + try: + key = self._selector.get_key(conn._sock) + events = key.events | selectors.EVENT_WRITE + self._selector.modify(key.fileobj, events, key.data) + except KeyError: + self._selector.register(conn._sock, selectors.EVENT_WRITE, conn) + def _poll(self, timeout): # This needs to be locked, but since it is only called from within the # locked section of poll(), there is no additional lock acquisition here processed = set() # Send pending requests first, before polling for responses - for conn in six.itervalues(self._conns): - conn.send_pending_requests() + self._register_send_sockets() start_select = time.time() ready = self._selector.select(timeout) @@ -623,10 +634,24 @@ class KafkaClient(object): if key.fileobj is self._wake_r: self._clear_wake_fd() continue + + # Send pending requests if socket is ready to write if events & selectors.EVENT_WRITE: conn = key.data if conn.connecting(): conn.connect() + else: + if conn.send_pending_requests_v2(): + # If send is complete, we dont need to track write readiness + # for this socket anymore + if key.events ^ selectors.EVENT_WRITE: + self._selector.modify( + key.fileobj, + key.events ^ selectors.EVENT_WRITE, + key.data) + else: + self._selector.unregister(key.fileobj) + if not (events & selectors.EVENT_READ): continue conn = key.data |