From 5b3042cb6d4fe3bb70e30e2ce7e776d8f124a27e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 9 Apr 2016 09:24:05 -0700 Subject: Handle SSL HANDSHAKE state in KafkaClient state change handler --- kafka/client_async.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 36e808c..9271008 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -168,8 +168,10 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): - self._connecting.add(node_id) - self._selector.register(conn._sock, selectors.EVENT_WRITE) + # SSL connections can enter this state 2x (second during Handshake) + if node_id not in self._connecting: + self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) -- cgit v1.2.1 From d1bfccfce1a9c1784ad17a38faf84d8fdab1e8ce Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 4 Apr 2016 19:52:09 -0700 Subject: Check for pending ssl bytes in KafkaClient loop --- kafka/client_async.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index 9271008..b91ae35 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -414,7 +414,9 @@ class KafkaClient(object): def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout assert self.in_flight_request_count() > 0 or self._connecting or sleep + responses = [] + processed = set() for key, events in self._selector.select(timeout): if key.fileobj is self._wake_r: self._clear_wake_fd() @@ -422,6 +424,7 @@ class KafkaClient(object): elif not (events & selectors.EVENT_READ): continue conn = key.data + processed.add(conn) while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -430,6 +433,15 @@ class KafkaClient(object): if not response: break responses.append(response) + + # Check for additional pending SSL bytes + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + # TODO: optimize + for conn in self._conns.values(): + if conn not in processed and conn.connected() and conn._sock.pending(): + response = conn.recv() + if response: + responses.append(response) return responses def in_flight_request_count(self, node_id=None): -- cgit v1.2.1 From 01f03656cc613a2281d22521da4a016c7fa4a8ba Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 4 Apr 2016 19:54:01 -0700 Subject: Add SSL configuration kwargs to KafkaClient, KafkaConsumer, KafkaProducer --- kafka/client_async.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) (limited to 'kafka/client_async.py') diff --git a/kafka/client_async.py b/kafka/client_async.py index b91ae35..2eb86cf 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -53,6 +53,12 @@ class KafkaClient(object): 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, } def __init__(self, **configs): @@ -90,6 +96,21 @@ class KafkaClient(object): brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: -- cgit v1.2.1