diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
commit | 0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch) | |
tree | 54c8520e94af2d72ca715c4db9bb855fbfa5574d /kafka | |
parent | cda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff) | |
parent | 097198cceaed97d5b804166d0c76a816c8dfead0 (diff) | |
download | kafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz |
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client_async.py | 39 | ||||
-rw-r--r-- | kafka/conn.py | 94 | ||||
-rw-r--r-- | kafka/consumer/group.py | 21 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 21 |
4 files changed, 170 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 36e808c..2eb86cf 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -53,6 +53,12 @@ class KafkaClient(object): 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, } def __init__(self, **configs): @@ -90,6 +96,21 @@ class KafkaClient(object): brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -168,8 +189,10 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): - self._connecting.add(node_id) - self._selector.register(conn._sock, selectors.EVENT_WRITE) + # SSL connections can enter this state 2x (second during Handshake) + if node_id not in self._connecting: + self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -412,7 +435,9 @@ class KafkaClient(object): def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout assert self.in_flight_request_count() > 0 or self._connecting or sleep + responses = [] + processed = set() for key, events in self._selector.select(timeout): if key.fileobj is self._wake_r: self._clear_wake_fd() @@ -420,6 +445,7 @@ class KafkaClient(object): elif not (events & selectors.EVENT_READ): continue conn = key.data + processed.add(conn) while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -428,6 +454,15 @@ class KafkaClient(object): if not response: break responses.append(response) + + # Check for additional pending SSL bytes + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + # TODO: optimize + for conn in self._conns.values(): + if conn not in processed and conn.connected() and conn._sock.pending(): + response = conn.recv() + if response: + responses.append(response) return responses def in_flight_request_count(self, node_id=None): diff --git a/kafka/conn.py b/kafka/conn.py index 28c09d9..f13ab64 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,7 @@ import logging import io from random import shuffle import socket +import ssl import struct from threading import local import time @@ -29,11 +30,25 @@ log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +# support older ssl libraries +try: + assert ssl.SSLWantReadError + assert ssl.SSLWantWriteError + assert ssl.SSLZeroReturnError +except: + log.warning('old ssl module detected.' + ' ssl error handling may not operate cleanly.' + ' Consider upgrading to python 3.5 or 2.7') + ssl.SSLWantReadError = ssl.SSLError + ssl.SSLWantWriteError = ssl.SSLError + ssl.SSLZeroReturnError = ssl.SSLError + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' + HANDSHAKE = '<handshake>' CONNECTED = '<connected>' @@ -49,6 +64,12 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, } @@ -66,6 +87,9 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self._sock = None + self._ssl_context = None + if self.config['ssl_context'] is not None: + self._ssl_context = self.config['ssl_context'] self._rbuffer = io.BytesIO() self._receiving = False self._next_payload_bytes = 0 @@ -87,6 +111,8 @@ class BrokerConnection(object): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.config['send_buffer_bytes']) self._sock.setblocking(False) + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + self._wrap_ssl() self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() self.config['state_change_callback'](self) @@ -103,7 +129,11 @@ class BrokerConnection(object): # Connection succeeded if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) - self.state = ConnectionStates.CONNECTED + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + log.debug('%s: initiating SSL handshake', str(self)) + self.state = ConnectionStates.HANDSHAKE + else: + self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) # Connection failed @@ -122,8 +152,60 @@ class BrokerConnection(object): else: pass + if self.state is ConnectionStates.HANDSHAKE: + if self._try_handshake(): + log.debug('%s: completed SSL handshake.', str(self)) + self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) + return self.state + def _wrap_ssl(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + if self._ssl_context is None: + log.debug('%s: configuring default SSL Context', str(self)) + self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member + self._ssl_context.verify_mode = ssl.CERT_OPTIONAL + if self.config['ssl_check_hostname']: + self._ssl_context.check_hostname = True + if self.config['ssl_cafile']: + log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile']) + self._ssl_context.load_verify_locations(self.config['ssl_cafile']) + self._ssl_context.verify_mode = ssl.CERT_REQUIRED + if self.config['ssl_certfile'] and self.config['ssl_keyfile']: + log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile']) + log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile']) + self._ssl_context.load_cert_chain( + certfile=self.config['ssl_certfile'], + keyfile=self.config['ssl_keyfile']) + log.debug('%s: wrapping socket in ssl context', str(self)) + try: + self._sock = self._ssl_context.wrap_socket( + self._sock, + server_hostname=self.host, + do_handshake_on_connect=False) + except ssl.SSLError: + log.exception('%s: Failed to wrap socket in SSLContext!', str(self)) + self.close() + self.last_failure = time.time() + + def _try_handshake(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + try: + self._sock.do_handshake() + return True + # old ssl in python2.6 will swallow all SSLErrors here... + except (ssl.SSLWantReadError, ssl.SSLWantWriteError): + pass + except ssl.SSLZeroReturnError: + log.warning('SSL connection closed by server during handshake.') + self.close() + # Other SSLErrors will be raised to user + + return False + def blacked_out(self): """ Return true if we are disconnected from the given node and can't @@ -140,8 +222,10 @@ class BrokerConnection(object): return self.state is ConnectionStates.CONNECTED def connecting(self): - """Return True iff socket is in intermediate connecting state.""" - return self.state is ConnectionStates.CONNECTING + """Returns True if still connecting (this may encompass several + different states, such as SSL handshake, authorization, etc).""" + return self.state in (ConnectionStates.CONNECTING, + ConnectionStates.HANDSHAKE) def disconnected(self): """Return True iff socket is closed""" @@ -260,6 +344,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) + except ssl.SSLWantReadError: + return None except ConnectionError as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: return None @@ -286,6 +372,8 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) + except ssl.SSLWantReadError: + return None except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 151e644..0a78e7f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator): consumer_timeout_ms (int): number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Default: -1 (dont throw exception) + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. 0.9 enables full group coordination features; 0.8.2 enables kafka-storage offset commits; 0.8.1 enables zookeeper-storage @@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator): 'send_buffer_bytes': None, 'receive_buffer_bytes': None, 'consumer_timeout_ms': -1, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet #'metric_reporters': None, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0aecdc5..1862f8d 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -192,6 +192,21 @@ class KafkaProducer(object): max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. If set to 'auto', will attempt to infer the broker version by probing various APIs. Default: auto @@ -222,6 +237,12 @@ class KafkaProducer(object): 'send_buffer_bytes': None, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', } |