diff options
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | README.rst | 10 | ||||
-rwxr-xr-x | build_integration.sh | 2 | ||||
-rw-r--r-- | kafka/conn.py | 297 | ||||
-rw-r--r-- | kafka/consumer/group.py | 15 | ||||
-rw-r--r-- | kafka/errors.py | 6 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 21 | ||||
-rw-r--r-- | kafka/protocol/parser.py | 177 | ||||
-rw-r--r-- | servers/0.11.0.1/resources/kafka.properties | 142 | ||||
-rw-r--r-- | servers/0.11.0.1/resources/log4j.properties | 25 | ||||
-rw-r--r-- | servers/0.11.0.1/resources/zookeeper.properties | 21 | ||||
-rw-r--r-- | test/fixtures.py | 2 |
12 files changed, 504 insertions, 216 deletions
diff --git a/.travis.yml b/.travis.yml index 21d4d79..75be510 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,7 +11,7 @@ env: - KAFKA_VERSION=0.8.2.2 - KAFKA_VERSION=0.9.0.1 - KAFKA_VERSION=0.10.2.1 - - KAFKA_VERSION=0.11.0.0 + - KAFKA_VERSION=0.11.0.1 sudo: false @@ -70,6 +70,8 @@ that expose basic message attributes: topic, partition, offset, key, and value: >>> for msg in consumer: ... assert isinstance(msg.value, dict) +>>> # Get consumer metrics +>>> metrics = consumer.metrics() KafkaProducer ************* @@ -110,6 +112,9 @@ for more details. >>> for i in range(1000): ... producer.send('foobar', b'msg %d' % i) +>>> # Get producer performance metrics +>>> metrics = producer.metrics() + Thread safety ************* @@ -122,8 +127,8 @@ multiprocessing is recommended. Compression *********** -kafka-python supports gzip compression/decompression natively. To produce or consume lz4 -compressed messages, you should install python-lz4 (pip install lz4). +kafka-python supports gzip compression/decompression natively. To produce or consume lz4 +compressed messages, you should install python-lz4 (pip install lz4). To enable snappy compression/decompression install python-snappy (also requires snappy library). See <https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install> for more information. @@ -138,7 +143,6 @@ leveraged to enable a KafkaClient.check_version() method that probes a kafka broker and attempts to identify which version it is running (0.8.0 to 0.11). - Low-level ********* diff --git a/build_integration.sh b/build_integration.sh index 28e501d..7ea22ef 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.0"} +: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.1 0.11.0.1"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} diff --git a/kafka/conn.py b/kafka/conn.py index dbe212a..e10d4f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,19 +6,19 @@ import errno import logging from random import shuffle, uniform import socket -import time +import struct import sys +import time from kafka.vendor import six import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.api import RequestHeader from kafka.protocol.admin import SaslHandShakeRequest -from kafka.protocol.commit import GroupCoordinatorResponse, OffsetFetchRequest -from kafka.protocol.frame import KafkaBytes +from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -73,9 +73,6 @@ class ConnectionStates(object): CONNECTED = '<connected>' AUTHENTICATING = '<authenticating>' -InFlightRequest = collections.namedtuple('InFlightRequest', - ['request', 'response_type', 'correlation_id', 'future', 'timestamp']) - class BrokerConnection(object): """Initialize a Kafka broker connection @@ -230,6 +227,9 @@ class BrokerConnection(object): assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) self.state = ConnectionStates.DISCONNECTED self._reset_reconnect_backoff() self._sock = None @@ -237,12 +237,7 @@ class BrokerConnection(object): if self.config['ssl_context'] is not None: self._ssl_context = self.config['ssl_context'] self._sasl_auth_future = None - self._header = KafkaBytes(4) - self._rbuffer = None - self._receiving = False self.last_attempt = 0 - self._processing = False - self._correlation_id = 0 self._gai = None self._gai_index = 0 self._sensors = None @@ -304,12 +299,15 @@ class BrokerConnection(object): self._sock.setsockopt(*option) self._sock.setblocking(False) + self.last_attempt = time.time() + self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() - log.info('%s: connecting to %s:%d', self, self.host, self.port) - self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() - self.config['state_change_callback'](self) + # _wrap_ssl can alter the connection state -- disconnects on failure + # so we need to double check that we are still connecting before + if self.connecting(): + self.config['state_change_callback'](self) + log.info('%s: connecting to %s:%d', self, self.host, self.port) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -372,10 +370,12 @@ class BrokerConnection(object): if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): - log.debug('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + # _try_authenticate has side-effects: possibly disconnected on socket errors + if self.state is ConnectionStates.AUTHENTICATING: + log.debug('%s: Connection complete.', self) + self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() + self.config['state_change_callback'](self) return self.state @@ -402,10 +402,7 @@ class BrokerConnection(object): password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - error = 'No CRL support with this version of Python.' - log.error('%s: %s Disconnecting.', self, error) - self.close(Errors.ConnectionError(error)) - return + raise RuntimeError('This version of Python does not support ssl_crlfile!') log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) # pylint: disable=no-member @@ -448,7 +445,9 @@ class BrokerConnection(object): self._sasl_auth_future = future self._recv() if self._sasl_auth_future.failed(): - raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type + ex = self._sasl_auth_future.exception + if not isinstance(ex, Errors.ConnectionError): + raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() def _handle_sasl_handshake_response(self, future, response): @@ -468,6 +467,19 @@ class BrokerConnection(object): 'kafka-python does not support SASL mechanism %s' % self.config['sasl_mechanism'])) + def _recv_bytes_blocking(self, n): + self._sock.setblocking(True) + try: + data = b'' + while len(data) < n: + fragment = self._sock.recv(n - len(data)) + if not fragment: + raise ConnectionError('Connection reset during recv') + data += fragment + return data + finally: + self._sock.setblocking(False) + def _try_authenticate_plain(self, future): if self.config['security_protocol'] == 'SASL_PLAINTEXT': log.warning('%s: Sending username and password in the clear', self) @@ -481,30 +493,23 @@ class BrokerConnection(object): self.config['sasl_plain_password']]).encode('utf-8')) size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a zero sized message (that is Int32(0)) on success. # The connection is closed on failure - while len(data) < 4: - fragment = self._sock.recv(4 - len(data)) - if not fragment: - log.error('%s: Authentication failed for user %s', self, self.config['sasl_plain_username']) - error = Errors.AuthenticationFailedError( - 'Authentication failed for user {0}'.format( - self.config['sasl_plain_username'])) - future.failure(error) - raise error - data += fragment - self._sock.setblocking(False) - except (AssertionError, ConnectionError) as e: + self._recv_bytes_blocking(4) + + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) + return future.failure(error) if data != b'\x00\x00\x00\x00': - return future.failure(Errors.AuthenticationFailedError()) + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) - log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) + log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) def _try_authenticate_gssapi(self, future): @@ -514,52 +519,41 @@ class BrokerConnection(object): ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - # Exchange tokens until authentication either succeeds or fails: + log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) + + # Exchange tokens until authentication either succeeds or fails received_token = None try: while not ctx_Context.complete: - # calculate the output token - try: - output_token = ctx_Context.step(received_token) - except GSSError as e: - log.exception("%s: Error invalid token received from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + # calculate an output token from kafka token (or None if first iteration) + output_token = ctx_Context.step(received_token) - if not output_token: - if ctx_Context.complete: - log.debug("%s: Security Context complete ", self) - log.debug("%s: Successful GSSAPI handshake for %s", self, ctx_Context.initiator_name) - break + # pass output token to kafka try: self._sock.setblocking(True) - # Send output token msg = output_token size = Int32.encode(len(msg)) self._sock.sendall(size + msg) + self._sock.setblocking(False) # The server will send a token back. Processing of this token either # establishes a security context, or it needs further token exchange. # The gssapi will be able to identify the needed next step. # The connection is closed on failure. - response = self._sock.recv(2000) - self._sock.setblocking(False) + header = self._recv_bytes_blocking(4) + token_size = struct.unpack('>i', header) + received_token = self._recv_bytes_blocking(token_size) - except (AssertionError, ConnectionError) as e: + except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) - - # pass the received token back to gssapi, strip the first 4 bytes - received_token = response[4:] + return future.failure(error) except Exception as e: - log.exception("%s: GSSAPI handshake error", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) - self.close(error=error) + return future.failure(e) - log.info('%s: Authenticated as %s', self, gssname) + log.info('%s: Authenticated as %s via GSSAPI', self, gssname) return future.success(True) def blacked_out(self): @@ -635,19 +629,16 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self.last_attempt = time.time() self._sasl_auth_future = None - self._reset_buffer() + self._protocol = KafkaProtocol( + client_id=self.config['client_id'], + api_version=self.config['api_version']) if error is None: error = Errors.Cancelled(str(self)) while self.in_flight_requests: - ifr = self.in_flight_requests.popleft() - ifr.future.failure(error) + (_, future, _) = self.in_flight_requests.popleft() + future.failure(error) self.config['state_change_callback'](self) - def _reset_buffer(self): - self._receiving = False - self._header.seek(0) - self._rbuffer = None - def send(self, request): """send request, return Future() @@ -665,13 +656,8 @@ class BrokerConnection(object): def _send(self, request): assert self.state in (ConnectionStates.AUTHENTICATING, ConnectionStates.CONNECTED) future = Future() - correlation_id = self._next_correlation_id() - header = RequestHeader(request, - correlation_id=correlation_id, - client_id=self.config['client_id']) - message = b''.join([header.encode(), request.encode()]) - size = Int32.encode(len(message)) - data = size + message + correlation_id = self._protocol.send_request(request) + data = self._protocol.send_bytes() try: # In the future we might manage an internal write buffer # and send bytes asynchronously. For now, just block @@ -693,11 +679,7 @@ class BrokerConnection(object): log.debug('%s Request %d: %s', self, correlation_id, request) if request.expect_response(): - ifr = InFlightRequest(request=request, - correlation_id=correlation_id, - response_type=request.RESPONSE_TYPE, - future=future, - timestamp=time.time()) + ifr = (correlation_id, future, time.time()) self.in_flight_requests.append(ifr) else: future.success(None) @@ -714,7 +696,6 @@ class BrokerConnection(object): Return response if available """ - assert not self._processing, 'Recursion not supported' if not self.connected() and not self.state is ConnectionStates.AUTHENTICATING: log.warning('%s cannot recv: socket not connected', self) # If requests are pending, we should close the socket and @@ -727,15 +708,28 @@ class BrokerConnection(object): log.warning('%s: No in-flight-requests to recv', self) return () - response = self._recv() - if not response and self.requests_timed_out(): + responses = self._recv() + if not responses and self.requests_timed_out(): log.warning('%s timed out after %s ms. Closing connection.', self, self.config['request_timeout_ms']) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % self.config['request_timeout_ms'])) return () - return response + + for response in responses: + (correlation_id, future, timestamp) = self.in_flight_requests.popleft() + if isinstance(response, Errors.KafkaError): + self.close(response) + break + + if self._sensors: + self._sensors.request_time.record((time.time() - timestamp) * 1000) + + log.debug('%s Response %d: %s', self, correlation_id, response) + future.success(response) + + return responses def _recv(self): responses = [] @@ -751,10 +745,7 @@ class BrokerConnection(object): log.error('%s: socket disconnected', self) self.close(error=Errors.ConnectionError('socket disconnected')) break - else: - responses.extend(self.receive_bytes(data)) - if len(data) < SOCK_CHUNK_BYTES: - break + except SSLWantReadError: break except ConnectionError as e: @@ -768,118 +759,26 @@ class BrokerConnection(object): if six.PY3: break raise - return responses - def receive_bytes(self, data): - i = 0 - n = len(data) - responses = [] - if self._sensors: - self._sensors.bytes_received.record(n) - while i < n: - - # Not receiving is the state of reading the payload header - if not self._receiving: - bytes_to_read = min(4 - self._header.tell(), n - i) - self._header.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - if self._header.tell() == 4: - self._header.seek(0) - nbytes = Int32.decode(self._header) - # reset buffer and switch state to receiving payload bytes - self._rbuffer = KafkaBytes(nbytes) - self._receiving = True - elif self._header.tell() > 4: - raise Errors.KafkaError('this should not happen - are you threading?') - - - if self._receiving: - total_bytes = len(self._rbuffer) - staged_bytes = self._rbuffer.tell() - bytes_to_read = min(total_bytes - staged_bytes, n - i) - self._rbuffer.write(data[i:i+bytes_to_read]) - i += bytes_to_read - - staged_bytes = self._rbuffer.tell() - if staged_bytes > total_bytes: - self.close(error=Errors.KafkaError('Receive buffer has more bytes than expected?')) - - if staged_bytes != total_bytes: - break + if self._sensors: + self._sensors.bytes_received.record(len(data)) - self._receiving = False - self._rbuffer.seek(0) - resp = self._process_response(self._rbuffer) - if resp is not None: - responses.append(resp) - self._reset_buffer() - return responses + try: + more_responses = self._protocol.receive_bytes(data) + except Errors.KafkaProtocolError as e: + self.close(e) + break + else: + responses.extend([resp for (_, resp) in more_responses]) - def _process_response(self, read_buffer): - assert not self._processing, 'Recursion not supported' - self._processing = True - recv_correlation_id = Int32.decode(read_buffer) - - if not self.in_flight_requests: - error = Errors.CorrelationIdError( - '%s: No in-flight-request found for server response' - ' with correlation ID %d' - % (self, recv_correlation_id)) - self.close(error) - self._processing = False - return None - else: - ifr = self.in_flight_requests.popleft() - - if self._sensors: - self._sensors.request_time.record((time.time() - ifr.timestamp) * 1000) - - # verify send/recv correlation ids match - - # 0.8.2 quirk - if (self.config['api_version'] == (0, 8, 2) and - ifr.response_type is GroupCoordinatorResponse[0] and - ifr.correlation_id != 0 and - recv_correlation_id == 0): - log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' - ' Correlation ID does not match request. This' - ' should go away once at least one topic has been' - ' initialized on the broker.') - - elif ifr.correlation_id != recv_correlation_id: - error = Errors.CorrelationIdError( - '%s: Correlation IDs do not match: sent %d, recv %d' - % (self, ifr.correlation_id, recv_correlation_id)) - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - # decode response - try: - response = ifr.response_type.decode(read_buffer) - except ValueError: - read_buffer.seek(0) - buf = read_buffer.read() - log.error('%s Response %d [ResponseType: %s Request: %s]:' - ' Unable to decode %d-byte buffer: %r', self, - ifr.correlation_id, ifr.response_type, - ifr.request, len(buf), buf) - error = Errors.UnknownError('Unable to decode response') - ifr.future.failure(error) - self.close(error) - self._processing = False - return None - - log.debug('%s Response %d: %s', self, ifr.correlation_id, response) - ifr.future.success(response) - self._processing = False - return response + if len(data) < SOCK_CHUNK_BYTES: + break + + return responses def requests_timed_out(self): if self.in_flight_requests: - oldest_at = self.in_flight_requests[0].timestamp + (_, _, oldest_at) = self.in_flight_requests[0] timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() >= oldest_at + timeout: return True diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index b7fbd83..a83d5da 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -846,13 +846,20 @@ class KafkaConsumer(six.Iterator): log.debug("Unsubscribed all topics or patterns and assigned partitions") def metrics(self, raw=False): - """Warning: this is an unstable interface. - It may change in future releases without warning""" + """Get metrics on consumer performance. + + This is ported from the Java Consumer, for details see: + https://kafka.apache.org/documentation/#new_consumer_monitoring + + Warning: + This is an unstable interface. It may change in future + releases without warning. + """ if raw: return self._metrics.metrics metrics = {} - for k, v in self._metrics.metrics.items(): + for k, v in six.iteritems(self._metrics.metrics): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: @@ -897,7 +904,7 @@ class KafkaConsumer(six.Iterator): raise UnsupportedVersionError( "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) - for tp, ts in timestamps.items(): + for tp, ts in six.iteritems(timestamps): timestamps[tp] = int(ts) if ts < 0: raise ValueError( diff --git a/kafka/errors.py b/kafka/errors.py index 35f9d94..c72455a 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -33,7 +33,11 @@ class NodeNotReadyError(KafkaError): retriable = True -class CorrelationIdError(KafkaError): +class KafkaProtocolError(KafkaError): + retriable = True + + +class CorrelationIdError(KafkaProtocolError): retriable = True diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 09ca744..de9dcd2 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -8,6 +8,8 @@ import threading import time import weakref +from ..vendor import six + from .. import errors as Errors from ..client_async import KafkaClient, selectors from ..metrics import MetricConfig, Metrics @@ -566,10 +568,10 @@ class KafkaProducer(object): Arguments: timeout (float, optional): timeout in seconds to wait for completion. - + Raises: - KafkaTimeoutError: failure to flush buffered records within the - provided timeout + KafkaTimeoutError: failure to flush buffered records within the + provided timeout """ log.debug("Flushing accumulated records in producer.") # trace self._accumulator.begin_flush() @@ -655,13 +657,20 @@ class KafkaProducer(object): available) def metrics(self, raw=False): - """Warning: this is an unstable interface. - It may change in future releases without warning""" + """Get metrics on producer performance. + + This is ported from the Java Producer, for details see: + https://kafka.apache.org/documentation/#producer_monitoring + + Warning: + This is an unstable interface. It may change in future + releases without warning. + """ if raw: return self._metrics.metrics metrics = {} - for k, v in self._metrics.metrics.items(): + for k, v in six.iteritems(self._metrics.metrics): if k.group not in metrics: metrics[k.group] = {} if k.name not in metrics[k.group]: diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py new file mode 100644 index 0000000..4d77bb3 --- /dev/null +++ b/kafka/protocol/parser.py @@ -0,0 +1,177 @@ +from __future__ import absolute_import + +import collections +import logging + +import kafka.errors as Errors +from kafka.protocol.api import RequestHeader +from kafka.protocol.commit import GroupCoordinatorResponse +from kafka.protocol.frame import KafkaBytes +from kafka.protocol.types import Int32 +from kafka.version import __version__ + +log = logging.getLogger(__name__) + + +class KafkaProtocol(object): + """Manage the kafka network protocol + + Use an instance of KafkaProtocol to manage bytes send/recv'd + from a network socket to a broker. + """ + def __init__(self, client_id=None, api_version=None): + if client_id is None: + client_id = self._gen_client_id() + self._client_id = client_id + self._api_version = api_version + self._correlation_id = 0 + self._header = KafkaBytes(4) + self._rbuffer = None + self._receiving = False + self.in_flight_requests = collections.deque() + self.bytes_to_send = [] + + def _next_correlation_id(self): + self._correlation_id = (self._correlation_id + 1) % 2**31 + return self._correlation_id + + def _gen_client_id(self): + return 'kafka-python' + __version__ + + def send_request(self, request, correlation_id=None): + """Encode and queue a kafka api request for sending. + + Arguments: + request (object): An un-encoded kafka request. + correlation_id (int, optional): Optionally specify an ID to + correlate requests with responses. If not provided, an ID will + be generated automatically. + + Returns: + correlation_id + """ + log.debug('Sending request %s', request) + if correlation_id is None: + correlation_id = self._next_correlation_id() + header = RequestHeader(request, + correlation_id=correlation_id, + client_id=self._client_id) + message = b''.join([header.encode(), request.encode()]) + size = Int32.encode(len(message)) + data = size + message + self.bytes_to_send.append(data) + if request.expect_response(): + ifr = (correlation_id, request) + self.in_flight_requests.append(ifr) + return correlation_id + + def send_bytes(self): + """Retrieve all pending bytes to send on the network""" + data = b''.join(self.bytes_to_send) + self.bytes_to_send = [] + return data + + def receive_bytes(self, data): + """Process bytes received from the network. + + Arguments: + data (bytes): any length bytes received from a network connection + to a kafka broker. + + Returns: + responses (list of (correlation_id, response)): any/all completed + responses, decoded from bytes to python objects. + + Raises: + KafkaProtocolError: if the bytes received could not be decoded. + CorrelationIdError: if the response does not match the request + correlation id. + """ + i = 0 + n = len(data) + responses = [] + while i < n: + + # Not receiving is the state of reading the payload header + if not self._receiving: + bytes_to_read = min(4 - self._header.tell(), n - i) + self._header.write(data[i:i+bytes_to_read]) + i += bytes_to_read + + if self._header.tell() == 4: + self._header.seek(0) + nbytes = Int32.decode(self._header) + # reset buffer and switch state to receiving payload bytes + self._rbuffer = KafkaBytes(nbytes) + self._receiving = True + elif self._header.tell() > 4: + raise Errors.KafkaError('this should not happen - are you threading?') + + if self._receiving: + total_bytes = len(self._rbuffer) + staged_bytes = self._rbuffer.tell() + bytes_to_read = min(total_bytes - staged_bytes, n - i) + self._rbuffer.write(data[i:i+bytes_to_read]) + i += bytes_to_read + + staged_bytes = self._rbuffer.tell() + if staged_bytes > total_bytes: + raise Errors.KafkaError('Receive buffer has more bytes than expected?') + + if staged_bytes != total_bytes: + break + + self._receiving = False + self._rbuffer.seek(0) + resp = self._process_response(self._rbuffer) + responses.append(resp) + self._reset_buffer() + return responses + + def _process_response(self, read_buffer): + recv_correlation_id = Int32.decode(read_buffer) + log.debug('Received correlation id: %d', recv_correlation_id) + + if not self.in_flight_requests: + raise Errors.CorrelationIdError( + 'No in-flight-request found for server response' + ' with correlation ID %d' + % recv_correlation_id) + + (correlation_id, request) = self.in_flight_requests.popleft() + + # 0.8.2 quirk + if (self._api_version == (0, 8, 2) and + request.RESPONSE_TYPE is GroupCoordinatorResponse[0] and + correlation_id != 0 and + recv_correlation_id == 0): + log.warning('Kafka 0.8.2 quirk -- GroupCoordinatorResponse' + ' Correlation ID does not match request. This' + ' should go away once at least one topic has been' + ' initialized on the broker.') + + elif correlation_id != recv_correlation_id: + # return or raise? + raise Errors.CorrelationIdError( + 'Correlation IDs do not match: sent %d, recv %d' + % (correlation_id, recv_correlation_id)) + + # decode response + log.debug('Processing response %s', request.RESPONSE_TYPE.__name__) + try: + response = request.RESPONSE_TYPE.decode(read_buffer) + except ValueError: + read_buffer.seek(0) + buf = read_buffer.read() + log.error('Response %d [ResponseType: %s Request: %s]:' + ' Unable to decode %d-byte buffer: %r', + correlation_id, request.RESPONSE_TYPE, + request, len(buf), buf) + raise Errors.KafkaProtocolError('Unable to decode response') + + return (correlation_id, response) + + def _reset_buffer(self): + self._receiving = False + self._header.seek(0) + self._rbuffer = None diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties new file mode 100644 index 0000000..f08855c --- /dev/null +++ b/servers/0.11.0.1/resources/kafka.properties @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +listeners={transport}://{host}:{port} +security.inter.broker.protocol={transport} + +ssl.keystore.location={ssl_dir}/server.keystore.jks +ssl.keystore.password=foobar +ssl.key.password=foobar +ssl.truststore.location={ssl_dir}/server.truststore.jks +ssl.truststore.password=foobar + +# The port the socket server listens on +#port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name=<hostname routable by clients> + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port=<port accessible by clients> + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +# tune down offset topics to reduce setup time in tests +offsets.commit.timeout.ms=500 +offsets.topic.num.partitions=2 +offsets.topic.replication.factor=1 + +# Allow shorter session timeouts for tests +group.min.session.timeout.ms=1000 + + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=30000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/0.11.0.1/resources/log4j.properties b/servers/0.11.0.1/resources/log4j.properties new file mode 100644 index 0000000..b0b76aa --- /dev/null +++ b/servers/0.11.0.1/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/servers/0.11.0.1/resources/zookeeper.properties b/servers/0.11.0.1/resources/zookeeper.properties new file mode 100644 index 0000000..e3fd097 --- /dev/null +++ b/servers/0.11.0.1/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/test/fixtures.py b/test/fixtures.py index e50ce12..c131f5a 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -21,7 +21,7 @@ log = logging.getLogger(__name__) class Fixture(object): - kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0') + kafka_version = os.environ.get('KAFKA_VERSION', '0.11.0.1') scala_version = os.environ.get("SCALA_VERSION", '2.8.0') project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin")) |