diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/__init__.py | 28 | ||||
-rw-r--r-- | kafka/client.py | 719 | ||||
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | kafka/consumer/__init__.py | 4 | ||||
-rw-r--r-- | kafka/consumer/base.py | 232 | ||||
-rw-r--r-- | kafka/consumer/group.py | 25 | ||||
-rw-r--r-- | kafka/consumer/multiprocess.py | 295 | ||||
-rw-r--r-- | kafka/consumer/simple.py | 444 | ||||
-rw-r--r-- | kafka/context.py | 178 | ||||
-rw-r--r-- | kafka/errors.py | 16 | ||||
-rw-r--r-- | kafka/partitioner/__init__.py | 8 | ||||
-rw-r--r-- | kafka/partitioner/base.py | 27 | ||||
-rw-r--r-- | kafka/partitioner/default.py | 72 | ||||
-rw-r--r-- | kafka/partitioner/hashed.py | 118 | ||||
-rw-r--r-- | kafka/partitioner/roundrobin.py | 70 | ||||
-rw-r--r-- | kafka/producer/__init__.py | 5 | ||||
-rw-r--r-- | kafka/producer/base.py | 482 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 49 | ||||
-rw-r--r-- | kafka/producer/simple.py | 54 | ||||
-rw-r--r-- | kafka/protocol/__init__.py | 6 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 474 | ||||
-rw-r--r-- | kafka/structs.py | 69 | ||||
-rw-r--r-- | kafka/util.py | 108 |
23 files changed, 79 insertions, 3408 deletions
diff --git a/kafka/__init__.py b/kafka/__init__.py index cafa043..d5e30af 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -19,38 +19,16 @@ logging.getLogger(__name__).addHandler(NullHandler()) from kafka.admin import KafkaAdminClient +from kafka.client_async import KafkaClient from kafka.consumer import KafkaConsumer from kafka.consumer.subscription_state import ConsumerRebalanceListener from kafka.producer import KafkaProducer from kafka.conn import BrokerConnection -from kafka.protocol import ( - create_message, create_gzip_message, create_snappy_message) -from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner from kafka.serializer import Serializer, Deserializer from kafka.structs import TopicPartition, OffsetAndMetadata -# To be deprecated when KafkaProducer interface is released -from kafka.client import SimpleClient -from kafka.producer import SimpleProducer, KeyedProducer - -# deprecated in favor of KafkaConsumer -from kafka.consumer import SimpleConsumer, MultiProcessConsumer - - -import warnings -class KafkaClient(SimpleClient): - def __init__(self, *args, **kwargs): - warnings.warn('The legacy KafkaClient interface has been moved to' - ' kafka.SimpleClient - this import will break in a' - ' future release', DeprecationWarning) - super(KafkaClient, self).__init__(*args, **kwargs) - __all__ = [ - 'KafkaAdminClient', - 'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection', - 'SimpleClient', 'SimpleProducer', 'KeyedProducer', - 'RoundRobinPartitioner', 'HashedPartitioner', - 'create_message', 'create_gzip_message', 'create_snappy_message', - 'SimpleConsumer', 'MultiProcessConsumer', 'ConsumerRebalanceListener', + 'BrokerConnection', 'ConsumerRebalanceListener', 'KafkaAdminClient', + 'KafkaClient', 'KafkaConsumer', 'KafkaProducer', ] diff --git a/kafka/client.py b/kafka/client.py deleted file mode 100644 index 148cae0..0000000 --- a/kafka/client.py +++ /dev/null @@ -1,719 +0,0 @@ -from __future__ import absolute_import - -import collections -import copy -import functools -import logging -import random -import time -import select - -from kafka.vendor import six - -import kafka.errors -from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError, - KafkaTimeoutError, KafkaUnavailableError, - LeaderNotAvailableError, UnknownTopicOrPartitionError, - NotLeaderForPartitionError, ReplicaNotAvailableError) -from kafka.structs import TopicPartition, BrokerMetadata - -from kafka.conn import ( - collect_hosts, BrokerConnection, - ConnectionStates, get_ip_port_afi) -from kafka.protocol import KafkaProtocol - -# New KafkaClient -# this is not exposed in top-level imports yet, -# due to conflicts with legacy SimpleConsumer / SimpleProducer usage -from kafka.client_async import KafkaClient - - -log = logging.getLogger(__name__) - - -# Legacy KafkaClient interface -- will be deprecated soon -class SimpleClient(object): - - CLIENT_ID = b'kafka-python' - DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 - - # NOTE: The timeout given to the client should always be greater than the - # one passed to SimpleConsumer.get_message(), otherwise you can get a - # socket timeout. - def __init__(self, hosts, client_id=CLIENT_ID, - timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS, - correlation_id=0): - # We need one connection to bootstrap - self.client_id = client_id - self.timeout = timeout - self.hosts = collect_hosts(hosts) - self.correlation_id = correlation_id - - self._conns = {} - self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # TopicPartition -> BrokerMetadata - self.topic_partitions = {} # topic -> partition -> leader - - self.load_metadata_for_topics() # bootstrap with all metadata - - ################## - # Private API # - ################## - - def _get_conn(self, host, port, afi): - """Get or create a connection to a broker using host and port""" - host_key = (host, port) - if host_key not in self._conns: - self._conns[host_key] = BrokerConnection( - host, port, afi, - request_timeout_ms=self.timeout * 1000, - client_id=self.client_id - ) - - conn = self._conns[host_key] - if not conn.connect_blocking(self.timeout): - conn.close() - raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi)) - return conn - - def _get_leader_for_partition(self, topic, partition): - """ - Returns the leader for a partition or None if the partition exists - but has no leader. - - Raises: - UnknownTopicOrPartitionError: If the topic or partition is not part - of the metadata. - LeaderNotAvailableError: If the server has metadata, but there is no - current leader. - """ - - key = TopicPartition(topic, partition) - - # Use cached metadata if it is there - if self.topics_to_brokers.get(key) is not None: - return self.topics_to_brokers[key] - - # Otherwise refresh metadata - - # If topic does not already exist, this will raise - # UnknownTopicOrPartitionError if not auto-creating - # LeaderNotAvailableError otherwise until partitions are created - self.load_metadata_for_topics(topic) - - # If the partition doesn't actually exist, raise - if partition not in self.topic_partitions.get(topic, []): - raise UnknownTopicOrPartitionError(key) - - # If there's no leader for the partition, raise - leader = self.topic_partitions[topic][partition] - if leader == -1: - raise LeaderNotAvailableError((topic, partition)) - - # Otherwise return the BrokerMetadata - return self.brokers[leader] - - def _get_coordinator_for_group(self, group): - """ - Returns the coordinator broker for a consumer group. - - GroupCoordinatorNotAvailableError will be raised if the coordinator - does not currently exist for the group. - - GroupLoadInProgressError is raised if the coordinator is available - but is still loading offsets from the internal topic - """ - - resp = self.send_consumer_metadata_request(group) - - # If there's a problem with finding the coordinator, raise the - # provided error - kafka.errors.check_error(resp) - - # Otherwise return the BrokerMetadata - return BrokerMetadata(resp.nodeId, resp.host, resp.port, None) - - def _next_id(self): - """Generate a new correlation id""" - # modulo to keep w/i int32 - self.correlation_id = (self.correlation_id + 1) % 2**31 - return self.correlation_id - - def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn): - """ - Attempt to send a broker-agnostic request to one of the available - brokers. Keep trying until you succeed. - """ - hosts = set() - for broker in self.brokers.values(): - host, port, afi = get_ip_port_afi(broker.host) - hosts.add((host, broker.port, afi)) - - hosts.update(self.hosts) - hosts = list(hosts) - random.shuffle(hosts) - - for (host, port, afi) in hosts: - try: - conn = self._get_conn(host, port, afi) - except KafkaConnectionError: - log.warning("Skipping unconnected connection: %s:%s (AFI %s)", - host, port, afi) - continue - request = encoder_fn(payloads=payloads) - future = conn.send(request) - - # Block - while not future.is_done: - for r, f in conn.recv(): - f.success(r) - - if future.failed(): - log.error("Request failed: %s", future.exception) - continue - - return decoder_fn(future.value) - - raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,)) - - def _payloads_by_broker(self, payloads): - payloads_by_broker = collections.defaultdict(list) - for payload in payloads: - try: - leader = self._get_leader_for_partition(payload.topic, payload.partition) - except (KafkaUnavailableError, LeaderNotAvailableError, - UnknownTopicOrPartitionError): - leader = None - payloads_by_broker[leader].append(payload) - return dict(payloads_by_broker) - - def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): - """ - Group a list of request payloads by topic+partition and send them to - the leader broker for that partition using the supplied encode/decode - functions - - Arguments: - - payloads: list of object-like entities with a topic (str) and - partition (int) attribute; payloads with duplicate topic-partitions - are not supported. - - encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments - - decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes - - Returns: - - List of response objects in the same order as the supplied payloads - """ - # encoders / decoders do not maintain ordering currently - # so we need to keep this so we can rebuild order before returning - original_ordering = [(p.topic, p.partition) for p in payloads] - - # Connection errors generally mean stale metadata - # although sometimes it means incorrect api request - # Unfortunately there is no good way to tell the difference - # so we'll just reset metadata on all errors to be safe - refresh_metadata = False - - # For each broker, send the list of request payloads - # and collect the responses and errors - payloads_by_broker = self._payloads_by_broker(payloads) - responses = {} - - def failed_payloads(payloads): - for payload in payloads: - topic_partition = (str(payload.topic), payload.partition) - responses[(topic_partition)] = FailedPayloadsError(payload) - - # For each BrokerConnection keep the real socket so that we can use - # a select to perform unblocking I/O - connections_by_future = {} - for broker, broker_payloads in six.iteritems(payloads_by_broker): - if broker is None: - failed_payloads(broker_payloads) - continue - - host, port, afi = get_ip_port_afi(broker.host) - try: - conn = self._get_conn(host, broker.port, afi) - except KafkaConnectionError: - refresh_metadata = True - failed_payloads(broker_payloads) - continue - - request = encoder_fn(payloads=broker_payloads) - future = conn.send(request) - - if future.failed(): - refresh_metadata = True - failed_payloads(broker_payloads) - continue - - if not request.expect_response(): - for payload in broker_payloads: - topic_partition = (str(payload.topic), payload.partition) - responses[topic_partition] = None - continue - - connections_by_future[future] = (conn, broker) - - conn = None - while connections_by_future: - futures = list(connections_by_future.keys()) - - # block until a socket is ready to be read - sockets = [ - conn._sock - for future, (conn, _) in six.iteritems(connections_by_future) - if not future.is_done and conn._sock is not None] - if sockets: - read_socks, _, _ = select.select(sockets, [], []) - - for future in futures: - - if not future.is_done: - conn, _ = connections_by_future[future] - for r, f in conn.recv(): - f.success(r) - continue - - _, broker = connections_by_future.pop(future) - if future.failed(): - refresh_metadata = True - failed_payloads(payloads_by_broker[broker]) - - else: - for payload_response in decoder_fn(future.value): - topic_partition = (str(payload_response.topic), - payload_response.partition) - responses[topic_partition] = payload_response - - if refresh_metadata: - self.reset_all_metadata() - - # Return responses in the same order as provided - return [responses[tp] for tp in original_ordering] - - def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn): - """ - Send a list of requests to the consumer coordinator for the group - specified using the supplied encode/decode functions. As the payloads - that use consumer-aware requests do not contain the group (e.g. - OffsetFetchRequest), all payloads must be for a single group. - - Arguments: - - group: the name of the consumer group (str) the payloads are for - payloads: list of object-like entities with topic (str) and - partition (int) attributes; payloads with duplicate - topic+partition are not supported. - - encode_fn: a method to encode the list of payloads to a request body, - must accept client_id, correlation_id, and payloads as - keyword arguments - - decode_fn: a method to decode a response body into response objects. - The response objects must be object-like and have topic - and partition attributes - - Returns: - - List of response objects in the same order as the supplied payloads - """ - # encoders / decoders do not maintain ordering currently - # so we need to keep this so we can rebuild order before returning - original_ordering = [(p.topic, p.partition) for p in payloads] - - broker = self._get_coordinator_for_group(group) - - # Send the list of request payloads and collect the responses and - # errors - responses = {} - request_id = self._next_id() - log.debug('Request %s to %s: %s', request_id, broker, payloads) - request = encoder_fn(client_id=self.client_id, - correlation_id=request_id, payloads=payloads) - - # Send the request, recv the response - try: - host, port, afi = get_ip_port_afi(broker.host) - conn = self._get_conn(host, broker.port, afi) - except KafkaConnectionError as e: - log.warning('KafkaConnectionError attempting to send request %s ' - 'to server %s: %s', request_id, broker, e) - - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) - - # No exception, try to get response - else: - - future = conn.send(request_id, request) - while not future.is_done: - for r, f in conn.recv(): - f.success(r) - - # decoder_fn=None signal that the server is expected to not - # send a response. This probably only applies to - # ProduceRequest w/ acks = 0 - if decoder_fn is None: - log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', request_id) - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = None - return [] - - if future.failed(): - log.warning('Error attempting to receive a ' - 'response to request %s from server %s: %s', - request_id, broker, future.exception) - - for payload in payloads: - topic_partition = (payload.topic, payload.partition) - responses[topic_partition] = FailedPayloadsError(payload) - - else: - response = future.value - _resps = [] - for payload_response in decoder_fn(response): - topic_partition = (payload_response.topic, - payload_response.partition) - responses[topic_partition] = payload_response - _resps.append(payload_response) - log.debug('Response %s: %s', request_id, _resps) - - # Return responses in the same order as provided - return [responses[tp] for tp in original_ordering] - - def __repr__(self): - return '<SimpleClient client_id=%s>' % (self.client_id) - - def _raise_on_response_error(self, resp): - - # Response can be an unraised exception object (FailedPayloadsError) - if isinstance(resp, Exception): - raise resp - - # Or a server api error response - try: - kafka.errors.check_error(resp) - except (UnknownTopicOrPartitionError, NotLeaderForPartitionError): - self.reset_topic_metadata(resp.topic) - raise - - # Return False if no error to enable list comprehensions - return False - - ################# - # Public API # - ################# - def close(self): - for conn in self._conns.values(): - conn.close() - - def copy(self): - """ - Create an inactive copy of the client object, suitable for passing - to a separate thread. - - Note that the copied connections are not initialized, so :meth:`.reinit` - must be called on the returned copy. - """ - _conns = self._conns - self._conns = {} - c = copy.deepcopy(self) - self._conns = _conns - return c - - def reinit(self): - timeout = time.time() + self.timeout - conns = set(self._conns.values()) - for conn in conns: - conn.close() - conn.connect() - - while time.time() < timeout: - for conn in list(conns): - conn.connect() - if conn.connected(): - conns.remove(conn) - if not conns: - break - - def reset_topic_metadata(self, *topics): - for topic in topics: - for topic_partition in list(self.topics_to_brokers.keys()): - if topic_partition.topic == topic: - del self.topics_to_brokers[topic_partition] - if topic in self.topic_partitions: - del self.topic_partitions[topic] - - def reset_all_metadata(self): - self.topics_to_brokers.clear() - self.topic_partitions.clear() - - def has_metadata_for_topic(self, topic): - return ( - topic in self.topic_partitions - and len(self.topic_partitions[topic]) > 0 - ) - - def get_partition_ids_for_topic(self, topic): - if topic not in self.topic_partitions: - return [] - - return sorted(list(self.topic_partitions[topic])) - - @property - def topics(self): - return list(self.topic_partitions.keys()) - - def ensure_topic_exists(self, topic, timeout=30): - start_time = time.time() - - while not self.has_metadata_for_topic(topic): - if time.time() > start_time + timeout: - raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) - self.load_metadata_for_topics(topic, ignore_leadernotavailable=True) - time.sleep(.5) - - def load_metadata_for_topics(self, *topics, **kwargs): - """Fetch broker and topic-partition metadata from the server. - - Updates internal data: broker list, topic/partition list, and - topic/partition -> broker map. This method should be called after - receiving any error. - - Note: Exceptions *will not* be raised in a full refresh (i.e. no topic - list). In this case, error codes will be logged as errors. - Partition-level errors will also not be raised here (a single partition - w/o a leader, for example). - - Arguments: - *topics (optional): If a list of topics is provided, - the metadata refresh will be limited to the specified topics - only. - ignore_leadernotavailable (bool): suppress LeaderNotAvailableError - so that metadata is loaded correctly during auto-create. - Default: False. - - Raises: - UnknownTopicOrPartitionError: Raised for topics that do not exist, - unless the broker is configured to auto-create topics. - LeaderNotAvailableError: Raised for topics that do not exist yet, - when the broker is configured to auto-create topics. Retry - after a short backoff (topics/partitions are initializing). - """ - if 'ignore_leadernotavailable' in kwargs: - ignore_leadernotavailable = kwargs['ignore_leadernotavailable'] - else: - ignore_leadernotavailable = False - - if topics: - self.reset_topic_metadata(*topics) - else: - self.reset_all_metadata() - - resp = self.send_metadata_request(topics) - - log.debug('Updating broker metadata: %s', resp.brokers) - log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics]) - - self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None)) - for nodeId, host, port in resp.brokers]) - - for error, topic, partitions in resp.topics: - # Errors expected for new topics - if error: - error_type = kafka.errors.kafka_errors.get(error, UnknownError) - if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError): - log.error('Error loading topic metadata for %s: %s (%s)', - topic, error_type, error) - if topic not in topics: - continue - elif (error_type is LeaderNotAvailableError and - ignore_leadernotavailable): - continue - raise error_type(topic) - - self.topic_partitions[topic] = {} - for error, partition, leader, _, _ in partitions: - - self.topic_partitions[topic][partition] = leader - - # Populate topics_to_brokers dict - topic_part = TopicPartition(topic, partition) - - # Check for partition errors - if error: - error_type = kafka.errors.kafka_errors.get(error, UnknownError) - - # If No Leader, topics_to_brokers topic_partition -> None - if error_type is LeaderNotAvailableError: - log.error('No leader for topic %s partition %d', topic, partition) - self.topics_to_brokers[topic_part] = None - continue - - # If one of the replicas is unavailable -- ignore - # this error code is provided for admin purposes only - # we never talk to replicas, only the leader - elif error_type is ReplicaNotAvailableError: - log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition) - - else: - raise error_type(topic_part) - - # If Known Broker, topic_partition -> BrokerMetadata - if leader in self.brokers: - self.topics_to_brokers[topic_part] = self.brokers[leader] - - # If Unknown Broker, fake BrokerMetadata so we don't lose the id - # (not sure how this could happen. server could be in bad state) - else: - self.topics_to_brokers[topic_part] = BrokerMetadata( - leader, None, None, None - ) - - def send_metadata_request(self, payloads=(), fail_on_error=True, - callback=None): - encoder = KafkaProtocol.encode_metadata_request - decoder = KafkaProtocol.decode_metadata_response - - return self._send_broker_unaware_request(payloads, encoder, decoder) - - def send_consumer_metadata_request(self, payloads=(), fail_on_error=True, - callback=None): - encoder = KafkaProtocol.encode_consumer_metadata_request - decoder = KafkaProtocol.decode_consumer_metadata_response - - return self._send_broker_unaware_request(payloads, encoder, decoder) - - def send_produce_request(self, payloads=(), acks=1, timeout=1000, - fail_on_error=True, callback=None): - """ - Encode and send some ProduceRequests - - ProduceRequests will be grouped by (topic, partition) and then - sent to a specific broker. Output is a list of responses in the - same order as the list of payloads specified - - Arguments: - payloads (list of ProduceRequest): produce requests to send to kafka - ProduceRequest payloads must not contain duplicates for any - topic-partition. - acks (int, optional): how many acks the servers should receive from replica - brokers before responding to the request. If it is 0, the server - will not send any response. If it is 1, the server will wait - until the data is written to the local log before sending a - response. If it is -1, the server will wait until the message - is committed by all in-sync replicas before sending a response. - For any value > 1, the server will wait for this number of acks to - occur (but the server will never wait for more acknowledgements than - there are in-sync replicas). defaults to 1. - timeout (int, optional): maximum time in milliseconds the server can - await the receipt of the number of acks, defaults to 1000. - fail_on_error (bool, optional): raise exceptions on connection and - server response errors, defaults to True. - callback (function, optional): instead of returning the ProduceResponse, - first pass it through this function, defaults to None. - - Returns: - list of ProduceResponses, or callback results if supplied, in the - order of input payloads - """ - - encoder = functools.partial( - KafkaProtocol.encode_produce_request, - acks=acks, - timeout=timeout) - - if acks == 0: - decoder = None - else: - decoder = KafkaProtocol.decode_produce_response - - resps = self._send_broker_aware_request(payloads, encoder, decoder) - - return [resp if not callback else callback(resp) for resp in resps - if resp is not None and - (not fail_on_error or not self._raise_on_response_error(resp))] - - def send_fetch_request(self, payloads=(), fail_on_error=True, - callback=None, max_wait_time=100, min_bytes=4096): - """ - Encode and send a FetchRequest - - Payloads are grouped by topic and partition so they can be pipelined - to the same brokers. - """ - - encoder = functools.partial(KafkaProtocol.encode_fetch_request, - max_wait_time=max_wait_time, - min_bytes=min_bytes) - - resps = self._send_broker_aware_request( - payloads, encoder, - KafkaProtocol.decode_fetch_response) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] - - def send_offset_request(self, payloads=(), fail_on_error=True, - callback=None): - resps = self._send_broker_aware_request( - payloads, - KafkaProtocol.encode_offset_request, - KafkaProtocol.decode_offset_response) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] - - def send_list_offset_request(self, payloads=(), fail_on_error=True, - callback=None): - resps = self._send_broker_aware_request( - payloads, - KafkaProtocol.encode_list_offset_request, - KafkaProtocol.decode_list_offset_response) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] - - def send_offset_commit_request(self, group, payloads=(), - fail_on_error=True, callback=None): - encoder = functools.partial(KafkaProtocol.encode_offset_commit_request, - group=group) - decoder = KafkaProtocol.decode_offset_commit_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] - - def send_offset_fetch_request(self, group, payloads=(), - fail_on_error=True, callback=None): - - encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, - group=group) - decoder = KafkaProtocol.decode_offset_fetch_response - resps = self._send_broker_aware_request(payloads, encoder, decoder) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] - - def send_offset_fetch_request_kafka(self, group, payloads=(), - fail_on_error=True, callback=None): - - encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request, - group=group, from_kafka=True) - decoder = KafkaProtocol.decode_offset_fetch_response - resps = self._send_consumer_aware_request(group, payloads, encoder, decoder) - - return [resp if not callback else callback(resp) for resp in resps - if not fail_on_error or not self._raise_on_response_error(resp)] diff --git a/kafka/common.py b/kafka/common.py deleted file mode 100644 index 15e88eb..0000000 --- a/kafka/common.py +++ /dev/null @@ -1,4 +0,0 @@ -from __future__ import absolute_import - -from kafka.structs import * -from kafka.errors import * diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py index 4b900ac..e09bcc1 100644 --- a/kafka/consumer/__init__.py +++ b/kafka/consumer/__init__.py @@ -1,9 +1,7 @@ from __future__ import absolute_import -from kafka.consumer.simple import SimpleConsumer -from kafka.consumer.multiprocess import MultiProcessConsumer from kafka.consumer.group import KafkaConsumer __all__ = [ - 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer' + 'KafkaConsumer' ] diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py deleted file mode 100644 index a77ce7e..0000000 --- a/kafka/consumer/base.py +++ /dev/null @@ -1,232 +0,0 @@ -from __future__ import absolute_import - -import atexit -import logging -import numbers -from threading import Lock -import warnings - -from kafka.errors import ( - UnknownTopicOrPartitionError, check_error, KafkaError) -from kafka.structs import ( - OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload) -from kafka.util import ReentrantTimer - - -log = logging.getLogger('kafka.consumer') - -AUTO_COMMIT_MSG_COUNT = 100 -AUTO_COMMIT_INTERVAL = 5000 - -FETCH_DEFAULT_BLOCK_TIMEOUT = 1 -FETCH_MAX_WAIT_TIME = 100 -FETCH_MIN_BYTES = 4096 -FETCH_BUFFER_SIZE_BYTES = 4096 -MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8 - -ITER_TIMEOUT_SECONDS = 60 -NO_MESSAGES_WAIT_TIME_SECONDS = 0.1 -FULL_QUEUE_WAIT_TIME_SECONDS = 0.1 - -MAX_BACKOFF_SECONDS = 60 - -class Consumer(object): - """ - Base class to be used by other consumers. Not to be used directly - - This base class provides logic for - - * initialization and fetching metadata of partitions - * Auto-commit logic - * APIs for fetching pending message count - - """ - def __init__(self, client, group, topic, partitions=None, auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL): - - warnings.warn('deprecated -- this class will be removed in a future' - ' release. Use KafkaConsumer instead.', - DeprecationWarning) - self.client = client - self.topic = topic - self.group = group - self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True) - self.offsets = {} - - if partitions is None: - partitions = self.client.get_partition_ids_for_topic(topic) - else: - assert all(isinstance(x, numbers.Integral) for x in partitions) - - # Variables for handling offset commits - self.commit_lock = Lock() - self.commit_timer = None - self.count_since_commit = 0 - self.auto_commit = auto_commit - self.auto_commit_every_n = auto_commit_every_n - self.auto_commit_every_t = auto_commit_every_t - - # Set up the auto-commit timer - if auto_commit is True and auto_commit_every_t is not None: - self.commit_timer = ReentrantTimer(auto_commit_every_t, - self.commit) - self.commit_timer.start() - - # Set initial offsets - if self.group is not None: - self.fetch_last_known_offsets(partitions) - else: - for partition in partitions: - self.offsets[partition] = 0 - - # Register a cleanup handler - def cleanup(obj): - obj.stop() - self._cleanup_func = cleanup - atexit.register(cleanup, self) - - self.partition_info = False # Do not return partition info in msgs - - def provide_partition_info(self): - """ - Indicates that partition info must be returned by the consumer - """ - self.partition_info = True - - def fetch_last_known_offsets(self, partitions=None): - if self.group is None: - raise ValueError('SimpleClient.group must not be None') - - if partitions is None: - partitions = self.client.get_partition_ids_for_topic(self.topic) - - responses = self.client.send_offset_fetch_request( - self.group, - [OffsetFetchRequestPayload(self.topic, p) for p in partitions], - fail_on_error=False - ) - - for resp in responses: - try: - check_error(resp) - # API spec says server won't set an error here - # but 0.8.1.1 does actually... - except UnknownTopicOrPartitionError: - pass - - # -1 offset signals no commit is currently stored - if resp.offset == -1: - self.offsets[resp.partition] = 0 - - # Otherwise we committed the stored offset - # and need to fetch the next one - else: - self.offsets[resp.partition] = resp.offset - - def commit(self, partitions=None): - """Commit stored offsets to Kafka via OffsetCommitRequest (v0) - - Keyword Arguments: - partitions (list): list of partitions to commit, default is to commit - all of them - - Returns: True on success, False on failure - """ - - # short circuit if nothing happened. This check is kept outside - # to prevent un-necessarily acquiring a lock for checking the state - if self.count_since_commit == 0: - return - - with self.commit_lock: - # Do this check again, just in case the state has changed - # during the lock acquiring timeout - if self.count_since_commit == 0: - return - - reqs = [] - if partitions is None: # commit all partitions - partitions = list(self.offsets.keys()) - - log.debug('Committing new offsets for %s, partitions %s', - self.topic, partitions) - for partition in partitions: - offset = self.offsets[partition] - log.debug('Commit offset %d in SimpleConsumer: ' - 'group=%s, topic=%s, partition=%s', - offset, self.group, self.topic, partition) - - reqs.append(OffsetCommitRequestPayload(self.topic, partition, - offset, None)) - - try: - self.client.send_offset_commit_request(self.group, reqs) - except KafkaError as e: - log.error('%s saving offsets: %s', e.__class__.__name__, e) - return False - else: - self.count_since_commit = 0 - return True - - def _auto_commit(self): - """ - Check if we have to commit based on number of messages and commit - """ - - # Check if we are supposed to do an auto-commit - if not self.auto_commit or self.auto_commit_every_n is None: - return - - if self.count_since_commit >= self.auto_commit_every_n: - self.commit() - - def stop(self): - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - - if hasattr(self, '_cleanup_func'): - # Remove cleanup handler now that we've stopped - - # py3 supports unregistering - if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup_func) # pylint: disable=no-member - - # py2 requires removing from private attribute... - else: - - # ValueError on list.remove() if the exithandler no longer - # exists is fine here - try: - atexit._exithandlers.remove( # pylint: disable=no-member - (self._cleanup_func, (self,), {})) - except ValueError: - pass - - del self._cleanup_func - - def pending(self, partitions=None): - """ - Gets the pending message count - - Keyword Arguments: - partitions (list): list of partitions to check for, default is to check all - """ - if partitions is None: - partitions = self.offsets.keys() - - total = 0 - reqs = [] - - for partition in partitions: - reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) - - resps = self.client.send_offset_request(reqs) - for resp in resps: - partition = resp.partition - pending = resp.offsets[0] - offset = self.offsets[partition] - total += pending - offset - - return total diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 15c2905..e9fd44c 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1207,28 +1207,3 @@ class KafkaConsumer(six.Iterator): if self.config['consumer_timeout_ms'] >= 0: self._consumer_timeout = time.time() + ( self.config['consumer_timeout_ms'] / 1000.0) - - # Old KafkaConsumer methods are deprecated - def configure(self, **configs): - raise NotImplementedError( - 'deprecated -- initialize a new consumer') - - def set_topic_partitions(self, *topics): - raise NotImplementedError( - 'deprecated -- use subscribe() or assign()') - - def fetch_messages(self): - raise NotImplementedError( - 'deprecated -- use poll() or iterator interface') - - def get_partition_offsets(self, topic, partition, - request_time_ms, max_num_offsets): - raise NotImplementedError( - 'deprecated -- send an OffsetRequest with KafkaClient') - - def offsets(self, group=None): - raise NotImplementedError('deprecated -- use committed(partition)') - - def task_done(self, message): - raise NotImplementedError( - 'deprecated -- commit offsets manually if needed') diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py deleted file mode 100644 index 758bb92..0000000 --- a/kafka/consumer/multiprocess.py +++ /dev/null @@ -1,295 +0,0 @@ -from __future__ import absolute_import - -from collections import namedtuple -import logging -from multiprocessing import Process, Manager as MPManager -import time -import warnings - -from kafka.vendor.six.moves import queue # pylint: disable=import-error - -from kafka.errors import KafkaError -from kafka.consumer.base import ( - Consumer, - AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL, - NO_MESSAGES_WAIT_TIME_SECONDS, - FULL_QUEUE_WAIT_TIME_SECONDS, - MAX_BACKOFF_SECONDS, -) -from kafka.consumer.simple import SimpleConsumer - - -log = logging.getLogger(__name__) - -Events = namedtuple("Events", ["start", "pause", "exit"]) - - -def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options): - """ - A child process worker which consumes messages based on the - notifications given by the controller process - - NOTE: Ideally, this should have been a method inside the Consumer - class. However, multiprocessing module has issues in windows. The - functionality breaks unless this function is kept outside of a class - """ - - # Initial interval for retries in seconds. - interval = 1 - while not events.exit.is_set(): - try: - # Make the child processes open separate socket connections - client.reinit() - - # We will start consumers without auto-commit. Auto-commit will be - # done by the master controller process. - consumer = SimpleConsumer(client, group, topic, - auto_commit=False, - auto_commit_every_n=None, - auto_commit_every_t=None, - **consumer_options) - - # Ensure that the consumer provides the partition information - consumer.provide_partition_info() - - while True: - # Wait till the controller indicates us to start consumption - events.start.wait() - - # If we are asked to quit, do so - if events.exit.is_set(): - break - - # Consume messages and add them to the queue. If the controller - # indicates a specific number of messages, follow that advice - count = 0 - - message = consumer.get_message() - if message: - while True: - try: - message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS) - break - except queue.Full: - if events.exit.is_set(): break - - count += 1 - - # We have reached the required size. The controller might have - # more than what he needs. Wait for a while. - # Without this logic, it is possible that we run into a big - # loop consuming all available messages before the controller - # can reset the 'start' event - if count == size.value: - events.pause.wait() - - else: - # In case we did not receive any message, give up the CPU for - # a while before we try again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - - consumer.stop() - - except KafkaError as e: - # Retry with exponential backoff - log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval) - time.sleep(interval) - interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS - - -class MultiProcessConsumer(Consumer): - """ - A consumer implementation that consumes partitions for a topic in - parallel using multiple processes - - Arguments: - client: a connected SimpleClient - group: a name for this consumer, used for offset storage and must be unique - If you are connecting to a server that does not support offset - commit/fetch (any prior to 0.8.1.1), then you *must* set this to None - topic: the topic to consume - - Keyword Arguments: - partitions: An optional list of partitions to consume the data from - auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume - before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - num_procs: Number of processes to start for consuming messages. - The available partitions will be divided among these processes - partitions_per_proc: Number of partitions to be allocated per process - (overrides num_procs) - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, - partitions=None, - auto_commit=True, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - num_procs=1, - partitions_per_proc=0, - **simple_consumer_options): - - warnings.warn('This class has been deprecated and will be removed in a' - ' future release. Use KafkaConsumer instead', - DeprecationWarning) - - # Initiate the base consumer class - super(MultiProcessConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - # Variables for managing and controlling the data flow from - # consumer child process to master - manager = MPManager() - self.queue = manager.Queue(1024) # Child consumers dump messages into this - self.events = Events( - start = manager.Event(), # Indicates the consumers to start fetch - exit = manager.Event(), # Requests the consumers to shutdown - pause = manager.Event()) # Requests the consumers to pause fetch - self.size = manager.Value('i', 0) # Indicator of number of messages to fetch - - # dict.keys() returns a view in py3 + it's not a thread-safe operation - # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3 - # It's safer to copy dict as it only runs during the init. - partitions = list(self.offsets.copy().keys()) - - # By default, start one consumer process for all partitions - # The logic below ensures that - # * we do not cross the num_procs limit - # * we have an even distribution of partitions among processes - - if partitions_per_proc: - num_procs = len(partitions) / partitions_per_proc - if num_procs * partitions_per_proc < len(partitions): - num_procs += 1 - - # The final set of chunks - chunks = [partitions[proc::num_procs] for proc in range(num_procs)] - - self.procs = [] - for chunk in chunks: - options = {'partitions': list(chunk)} - if simple_consumer_options: - simple_consumer_options.pop('partitions', None) - options.update(simple_consumer_options) - - args = (client.copy(), self.group, self.topic, self.queue, - self.size, self.events) - proc = Process(target=_mp_consume, args=args, kwargs=options) - proc.daemon = True - proc.start() - self.procs.append(proc) - - def __repr__(self): - return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \ - (self.group, self.topic, len(self.procs)) - - def stop(self): - # Set exit and start off all waiting consumers - self.events.exit.set() - self.events.pause.set() - self.events.start.set() - - for proc in self.procs: - proc.join() - proc.terminate() - - super(MultiProcessConsumer, self).stop() - - def __iter__(self): - """ - Iterator to consume the messages available on this consumer - """ - # Trigger the consumer procs to start off. - # We will iterate till there are no more messages available - self.size.value = 0 - self.events.pause.set() - - while True: - self.events.start.set() - try: - # We will block for a small while so that the consumers get - # a chance to run and put some messages in the queue - # TODO: This is a hack and will make the consumer block for - # at least one second. Need to find a better way of doing this - partition, message = self.queue.get(block=True, timeout=1) - except queue.Empty: - break - - # Count, check and commit messages if necessary - self.offsets[partition] = message.offset + 1 - self.events.start.clear() - self.count_since_commit += 1 - self._auto_commit() - yield message - - self.events.start.clear() - - def get_messages(self, count=1, block=True, timeout=10): - """ - Fetch the specified number of messages - - Keyword Arguments: - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till all messages are fetched. - If block is a positive integer the API will block until that - many messages are fetched. - timeout: When blocking is requested the function will block for - the specified time (in seconds) until count messages is - fetched. If None, it will block forever. - """ - messages = [] - - # Give a size hint to the consumers. Each consumer process will fetch - # a maximum of "count" messages. This will fetch more messages than - # necessary, but these will not be committed to kafka. Also, the extra - # messages can be provided in subsequent runs - self.size.value = count - self.events.pause.clear() - - if timeout is not None: - max_time = time.time() + timeout - - new_offsets = {} - while count > 0 and (timeout is None or timeout > 0): - # Trigger consumption only if the queue is empty - # By doing this, we will ensure that consumers do not - # go into overdrive and keep consuming thousands of - # messages when the user might need only a few - if self.queue.empty(): - self.events.start.set() - - block_next_call = block is True or block > len(messages) - try: - partition, message = self.queue.get(block_next_call, - timeout) - except queue.Empty: - break - - _msg = (partition, message) if self.partition_info else message - messages.append(_msg) - new_offsets[partition] = message.offset + 1 - count -= 1 - if timeout is not None: - timeout = max_time - time.time() - - self.size.value = 0 - self.events.start.clear() - self.events.pause.set() - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - - return messages diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py deleted file mode 100644 index a6a64a5..0000000 --- a/kafka/consumer/simple.py +++ /dev/null @@ -1,444 +0,0 @@ -from __future__ import absolute_import - -try: - from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611 -except ImportError: - from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611 -import logging -import sys -import time -import warnings - -from kafka.vendor import six -from kafka.vendor.six.moves import queue # pylint: disable=import-error - -from kafka.consumer.base import ( - Consumer, - FETCH_DEFAULT_BLOCK_TIMEOUT, - AUTO_COMMIT_MSG_COUNT, - AUTO_COMMIT_INTERVAL, - FETCH_MIN_BYTES, - FETCH_BUFFER_SIZE_BYTES, - MAX_FETCH_BUFFER_SIZE_BYTES, - FETCH_MAX_WAIT_TIME, - ITER_TIMEOUT_SECONDS, - NO_MESSAGES_WAIT_TIME_SECONDS -) -from kafka.errors import ( - KafkaError, ConsumerFetchSizeTooSmall, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, - OffsetOutOfRangeError, FailedPayloadsError, check_error -) -from kafka.protocol.message import PartialMessage -from kafka.structs import FetchRequestPayload, OffsetRequestPayload - - -log = logging.getLogger(__name__) - - -class FetchContext(object): - """ - Class for managing the state of a consumer during fetch - """ - def __init__(self, consumer, block, timeout): - warnings.warn('deprecated - this class will be removed in a future' - ' release', DeprecationWarning) - self.consumer = consumer - self.block = block - - if block: - if not timeout: - timeout = FETCH_DEFAULT_BLOCK_TIMEOUT - self.timeout = timeout * 1000 - - def __enter__(self): - """Set fetch values based on blocking status""" - self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time - self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes - if self.block: - self.consumer.fetch_max_wait_time = self.timeout - self.consumer.fetch_min_bytes = 1 - else: - self.consumer.fetch_min_bytes = 0 - - def __exit__(self, type, value, traceback): - """Reset values""" - self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time - self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes - - -class SimpleConsumer(Consumer): - """ - A simple consumer implementation that consumes all/specified partitions - for a topic - - Arguments: - client: a connected SimpleClient - group: a name for this consumer, used for offset storage and must be unique - If you are connecting to a server that does not support offset - commit/fetch (any prior to 0.8.1.1), then you *must* set this to None - topic: the topic to consume - - Keyword Arguments: - partitions: An optional list of partitions to consume the data from - - auto_commit: default True. Whether or not to auto commit the offsets - - auto_commit_every_n: default 100. How many messages to consume - before a commit - - auto_commit_every_t: default 5000. How much time (in milliseconds) to - wait before commit - fetch_size_bytes: number of bytes to request in a FetchRequest - - buffer_size: default 4K. Initial number of bytes to tell kafka we - have available. This will double as needed. - - max_buffer_size: default 16K. Max number of bytes to tell kafka we have - available. None means no limit. - - iter_timeout: default None. How much time (in seconds) to wait for a - message in the iterator before exiting. None means no - timeout, so it will wait forever. - - auto_offset_reset: default largest. Reset partition offsets upon - OffsetOutOfRangeError. Valid values are largest and smallest. - Otherwise, do not reset the offsets and raise OffsetOutOfRangeError. - - Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will - reset one another when one is triggered. These triggers simply call the - commit method on this class. A manual call to commit will also reset - these triggers - """ - def __init__(self, client, group, topic, auto_commit=True, partitions=None, - auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, - auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES, - buffer_size=FETCH_BUFFER_SIZE_BYTES, - max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES, - iter_timeout=None, - auto_offset_reset='largest'): - warnings.warn('deprecated - this class will be removed in a future' - ' release. Use KafkaConsumer instead.', - DeprecationWarning) - super(SimpleConsumer, self).__init__( - client, group, topic, - partitions=partitions, - auto_commit=auto_commit, - auto_commit_every_n=auto_commit_every_n, - auto_commit_every_t=auto_commit_every_t) - - if max_buffer_size is not None and buffer_size > max_buffer_size: - raise ValueError('buffer_size (%d) is greater than ' - 'max_buffer_size (%d)' % - (buffer_size, max_buffer_size)) - self.buffer_size = buffer_size - self.max_buffer_size = max_buffer_size - self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME - self.fetch_min_bytes = fetch_size_bytes - self.fetch_offsets = self.offsets.copy() - self.iter_timeout = iter_timeout - self.auto_offset_reset = auto_offset_reset - self.queue = queue.Queue() - - def __repr__(self): - return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \ - (self.group, self.topic, str(self.offsets.keys())) - - def reset_partition_offset(self, partition): - """Update offsets using auto_offset_reset policy (smallest|largest) - - Arguments: - partition (int): the partition for which offsets should be updated - - Returns: Updated offset on success, None on failure - """ - LATEST = -1 - EARLIEST = -2 - if self.auto_offset_reset == 'largest': - reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)] - elif self.auto_offset_reset == 'smallest': - reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)] - else: - # Let's raise an reasonable exception type if user calls - # outside of an exception context - if sys.exc_info() == (None, None, None): - raise OffsetOutOfRangeError('Cannot reset partition offsets without a ' - 'valid auto_offset_reset setting ' - '(largest|smallest)') - # Otherwise we should re-raise the upstream exception - # b/c it typically includes additional data about - # the request that triggered it, and we do not want to drop that - raise # pylint: disable=E0704 - - # send_offset_request - log.info('Resetting topic-partition offset to %s for %s:%d', - self.auto_offset_reset, self.topic, partition) - try: - (resp, ) = self.client.send_offset_request(reqs) - except KafkaError as e: - log.error('%s sending offset request for %s:%d', - e.__class__.__name__, self.topic, partition) - else: - self.offsets[partition] = resp.offsets[0] - self.fetch_offsets[partition] = resp.offsets[0] - return resp.offsets[0] - - def seek(self, offset, whence=None, partition=None): - """ - Alter the current offset in the consumer, similar to fseek - - Arguments: - offset: how much to modify the offset - whence: where to modify it from, default is None - - * None is an absolute offset - * 0 is relative to the earliest available offset (head) - * 1 is relative to the current offset - * 2 is relative to the latest known offset (tail) - - partition: modify which partition, default is None. - If partition is None, would modify all partitions. - """ - - if whence is None: # set an absolute offset - if partition is None: - for tmp_partition in self.offsets: - self.offsets[tmp_partition] = offset - else: - self.offsets[partition] = offset - elif whence == 1: # relative to current position - if partition is None: - for tmp_partition, _offset in self.offsets.items(): - self.offsets[tmp_partition] = _offset + offset - else: - self.offsets[partition] += offset - elif whence in (0, 2): # relative to beginning or end - reqs = [] - deltas = {} - if partition is None: - # divide the request offset by number of partitions, - # distribute the remained evenly - (delta, rem) = divmod(offset, len(self.offsets)) - for tmp_partition, r in izip_longest(self.offsets.keys(), - repeat(1, rem), - fillvalue=0): - deltas[tmp_partition] = delta + r - - for tmp_partition in self.offsets.keys(): - if whence == 0: - reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1)) - else: - pass - else: - deltas[partition] = offset - if whence == 0: - reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1)) - elif whence == 2: - reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1)) - else: - pass - - resps = self.client.send_offset_request(reqs) - for resp in resps: - self.offsets[resp.partition] = \ - resp.offsets[0] + deltas[resp.partition] - else: - raise ValueError('Unexpected value for `whence`, %d' % (whence,)) - - # Reset queue and fetch offsets since they are invalid - self.fetch_offsets = self.offsets.copy() - self.count_since_commit += 1 - if self.auto_commit: - self.commit() - - self.queue = queue.Queue() - - def get_messages(self, count=1, block=True, timeout=0.1): - """ - Fetch the specified number of messages - - Keyword Arguments: - count: Indicates the maximum number of messages to be fetched - block: If True, the API will block till all messages are fetched. - If block is a positive integer the API will block until that - many messages are fetched. - timeout: When blocking is requested the function will block for - the specified time (in seconds) until count messages is - fetched. If None, it will block forever. - """ - messages = [] - if timeout is not None: - timeout += time.time() - - new_offsets = {} - log.debug('getting %d messages', count) - while len(messages) < count: - block_time = timeout - time.time() - log.debug('calling _get_message block=%s timeout=%s', block, block_time) - block_next_call = block is True or block > len(messages) - result = self._get_message(block_next_call, block_time, - get_partition_info=True, - update_offset=False) - log.debug('got %s from _get_messages', result) - if not result: - if block_next_call and (timeout is None or time.time() <= timeout): - continue - break - - partition, message = result - _msg = (partition, message) if self.partition_info else message - messages.append(_msg) - new_offsets[partition] = message.offset + 1 - - # Update and commit offsets if necessary - self.offsets.update(new_offsets) - self.count_since_commit += len(messages) - self._auto_commit() - log.debug('got %d messages: %s', len(messages), messages) - return messages - - def get_message(self, block=True, timeout=0.1, get_partition_info=None): - return self._get_message(block, timeout, get_partition_info) - - def _get_message(self, block=True, timeout=0.1, get_partition_info=None, - update_offset=True): - """ - If no messages can be fetched, returns None. - If get_partition_info is None, it defaults to self.partition_info - If get_partition_info is True, returns (partition, message) - If get_partition_info is False, returns message - """ - start_at = time.time() - while self.queue.empty(): - # We're out of messages, go grab some more. - log.debug('internal queue empty, fetching more messages') - with FetchContext(self, block, timeout): - self._fetch() - - if not block or time.time() > (start_at + timeout): - break - - try: - partition, message = self.queue.get_nowait() - - if update_offset: - # Update partition offset - self.offsets[partition] = message.offset + 1 - - # Count, check and commit messages if necessary - self.count_since_commit += 1 - self._auto_commit() - - if get_partition_info is None: - get_partition_info = self.partition_info - if get_partition_info: - return partition, message - else: - return message - except queue.Empty: - log.debug('internal queue empty after fetch - returning None') - return None - - def __iter__(self): - if self.iter_timeout is None: - timeout = ITER_TIMEOUT_SECONDS - else: - timeout = self.iter_timeout - - while True: - message = self.get_message(True, timeout) - if message: - yield message - elif self.iter_timeout is None: - # We did not receive any message yet but we don't have a - # timeout, so give up the CPU for a while before trying again - time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS) - else: - # Timed out waiting for a message - break - - def _fetch(self): - # Create fetch request payloads for all the partitions - partitions = dict((p, self.buffer_size) - for p in self.fetch_offsets.keys()) - while partitions: - requests = [] - for partition, buffer_size in six.iteritems(partitions): - requests.append(FetchRequestPayload(self.topic, partition, - self.fetch_offsets[partition], - buffer_size)) - # Send request - responses = self.client.send_fetch_request( - requests, - max_wait_time=int(self.fetch_max_wait_time), - min_bytes=self.fetch_min_bytes, - fail_on_error=False - ) - - retry_partitions = {} - for resp in responses: - - try: - check_error(resp) - except UnknownTopicOrPartitionError: - log.error('UnknownTopicOrPartitionError for %s:%d', - resp.topic, resp.partition) - self.client.reset_topic_metadata(resp.topic) - raise - except NotLeaderForPartitionError: - log.error('NotLeaderForPartitionError for %s:%d', - resp.topic, resp.partition) - self.client.reset_topic_metadata(resp.topic) - continue - except OffsetOutOfRangeError: - log.warning('OffsetOutOfRangeError for %s:%d. ' - 'Resetting partition offset...', - resp.topic, resp.partition) - self.reset_partition_offset(resp.partition) - # Retry this partition - retry_partitions[resp.partition] = partitions[resp.partition] - continue - except FailedPayloadsError as e: - log.warning('FailedPayloadsError for %s:%d', - e.payload.topic, e.payload.partition) - # Retry this partition - retry_partitions[e.payload.partition] = partitions[e.payload.partition] - continue - - partition = resp.partition - buffer_size = partitions[partition] - - # Check for partial message - if resp.messages and isinstance(resp.messages[-1].message, PartialMessage): - - # If buffer is at max and all we got was a partial message - # raise ConsumerFetchSizeTooSmall - if (self.max_buffer_size is not None and - buffer_size == self.max_buffer_size and - len(resp.messages) == 1): - - log.error('Max fetch size %d too small', self.max_buffer_size) - raise ConsumerFetchSizeTooSmall() - - if self.max_buffer_size is None: - buffer_size *= 2 - else: - buffer_size = min(buffer_size * 2, self.max_buffer_size) - log.warning('Fetch size too small, increase to %d (2x) ' - 'and retry', buffer_size) - retry_partitions[partition] = buffer_size - resp.messages.pop() - - for message in resp.messages: - if message.offset < self.fetch_offsets[partition]: - log.debug('Skipping message %s because its offset is less than the consumer offset', - message) - continue - # Put the message in our queue - self.queue.put((partition, message)) - self.fetch_offsets[partition] = message.offset + 1 - partitions = retry_partitions diff --git a/kafka/context.py b/kafka/context.py deleted file mode 100644 index 1ebc71d..0000000 --- a/kafka/context.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Context manager to commit/rollback consumer offsets. -""" -from __future__ import absolute_import - -from logging import getLogger - -from kafka.errors import check_error, OffsetOutOfRangeError -from kafka.structs import OffsetCommitRequestPayload - - -class OffsetCommitContext(object): - """ - Provides commit/rollback semantics around a `SimpleConsumer`. - - Usage assumes that `auto_commit` is disabled, that messages are consumed in - batches, and that the consuming process will record its own successful - processing of each message. Both the commit and rollback operations respect - a "high-water mark" to ensure that last unsuccessfully processed message - will be retried. - - Example: - - .. code:: python - - consumer = SimpleConsumer(client, group, topic, auto_commit=False) - consumer.provide_partition_info() - consumer.fetch_last_known_offsets() - - while some_condition: - with OffsetCommitContext(consumer) as context: - messages = consumer.get_messages(count, block=False) - - for partition, message in messages: - if can_process(message): - context.mark(partition, message.offset) - else: - break - - if not context: - sleep(delay) - - - These semantics allow for deferred message processing (e.g. if `can_process` - compares message time to clock time) and for repeated processing of the last - unsuccessful message (until some external error is resolved). - """ - - def __init__(self, consumer): - """ - :param consumer: an instance of `SimpleConsumer` - """ - self.consumer = consumer - self.initial_offsets = None - self.high_water_mark = None - self.logger = getLogger("kafka.context") - - def mark(self, partition, offset): - """ - Set the high-water mark in the current context. - - In order to know the current partition, it is helpful to initialize - the consumer to provide partition info via: - - .. code:: python - - consumer.provide_partition_info() - - """ - max_offset = max(offset + 1, self.high_water_mark.get(partition, 0)) - - self.logger.debug("Setting high-water mark to: %s", - {partition: max_offset}) - - self.high_water_mark[partition] = max_offset - - def __nonzero__(self): - """ - Return whether any operations were marked in the context. - """ - return bool(self.high_water_mark) - - def __enter__(self): - """ - Start a new context: - - - Record the initial offsets for rollback - - Reset the high-water mark - """ - self.initial_offsets = dict(self.consumer.offsets) - self.high_water_mark = dict() - - self.logger.debug("Starting context at: %s", self.initial_offsets) - - return self - - def __exit__(self, exc_type, exc_value, traceback): - """ - End a context. - - - If there was no exception, commit up to the current high-water mark. - - If there was an offset of range error, attempt to find the correct - initial offset. - - If there was any other error, roll back to the initial offsets. - """ - if exc_type is None: - self.commit() - elif isinstance(exc_value, OffsetOutOfRangeError): - self.handle_out_of_range() - return True - else: - self.rollback() - - def commit(self): - """ - Commit this context's offsets: - - - If the high-water mark has moved, commit up to and position the - consumer at the high-water mark. - - Otherwise, reset to the consumer to the initial offsets. - """ - if self.high_water_mark: - self.logger.info("Committing offsets: %s", self.high_water_mark) - self.commit_partition_offsets(self.high_water_mark) - self.update_consumer_offsets(self.high_water_mark) - else: - self.update_consumer_offsets(self.initial_offsets) - - def rollback(self): - """ - Rollback this context: - - - Position the consumer at the initial offsets. - """ - self.logger.info("Rolling back context: %s", self.initial_offsets) - self.update_consumer_offsets(self.initial_offsets) - - def commit_partition_offsets(self, partition_offsets): - """ - Commit explicit partition/offset pairs. - """ - self.logger.debug("Committing partition offsets: %s", partition_offsets) - - commit_requests = [ - OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None) - for partition, offset in partition_offsets.items() - ] - commit_responses = self.consumer.client.send_offset_commit_request( - self.consumer.group, - commit_requests, - ) - for commit_response in commit_responses: - check_error(commit_response) - - def update_consumer_offsets(self, partition_offsets): - """ - Update consumer offsets to explicit positions. - """ - self.logger.debug("Updating consumer offsets to: %s", partition_offsets) - - for partition, offset in partition_offsets.items(): - self.consumer.offsets[partition] = offset - - # consumer keeps other offset states beyond its `offsets` dictionary, - # a relative seek with zero delta forces the consumer to reset to the - # current value of the `offsets` dictionary - self.consumer.seek(0, 1) - - def handle_out_of_range(self): - """ - Handle out of range condition by seeking to the beginning of valid - ranges. - - This assumes that an out of range doesn't happen by seeking past the end - of valid ranges -- which is far less likely. - """ - self.logger.info("Seeking beginning of partition on out of range error") - self.consumer.seek(0, 0) diff --git a/kafka/errors.py b/kafka/errors.py index abef2c5..6da2908 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -472,22 +472,6 @@ class ConnectionError(KafkaConnectionError): """Deprecated""" -class BufferUnderflowError(KafkaError): - pass - - -class ChecksumError(KafkaError): - pass - - -class ConsumerFetchSizeTooSmall(KafkaError): - pass - - -class ConsumerNoMoreData(KafkaError): - pass - - class ProtocolError(KafkaError): pass diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py index a9dbbdc..21a3bbb 100644 --- a/kafka/partitioner/__init__.py +++ b/kafka/partitioner/__init__.py @@ -1,10 +1,8 @@ from __future__ import absolute_import -from kafka.partitioner.default import DefaultPartitioner -from kafka.partitioner.hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner -from kafka.partitioner.roundrobin import RoundRobinPartitioner +from kafka.partitioner.default import DefaultPartitioner, murmur2 + __all__ = [ - 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner', - 'Murmur2Partitioner', 'LegacyPartitioner' + 'DefaultPartitioner', 'murmur2' ] diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py deleted file mode 100644 index 0e36253..0000000 --- a/kafka/partitioner/base.py +++ /dev/null @@ -1,27 +0,0 @@ -from __future__ import absolute_import - - -class Partitioner(object): - """ - Base class for a partitioner - """ - def __init__(self, partitions=None): - """ - Initialize the partitioner - - Arguments: - partitions: A list of available partitions (during startup) OPTIONAL. - """ - self.partitions = partitions - - def __call__(self, key, all_partitions=None, available_partitions=None): - """ - Takes a string key, num_partitions and available_partitions as argument and returns - a partition to be used for the message - - Arguments: - key: the key to use for partitioning. - all_partitions: a list of the topic's partitions. - available_partitions: a list of the broker's currently avaliable partitions(optional). - """ - raise NotImplementedError('partition function has to be implemented') diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py index e4d9df5..d0914c6 100644 --- a/kafka/partitioner/default.py +++ b/kafka/partitioner/default.py @@ -2,7 +2,7 @@ from __future__ import absolute_import import random -from kafka.partitioner.hashed import murmur2 +from kafka.vendor import six class DefaultPartitioner(object): @@ -30,3 +30,73 @@ class DefaultPartitioner(object): idx &= 0x7fffffff idx %= len(all_partitions) return all_partitions[idx] + + +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 +def murmur2(data): + """Pure-python Murmur2 implementation. + + Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 + + Args: + data (bytes): opaque bytes + + Returns: MurmurHash2 of data + """ + # Python2 bytes is really a str, causing the bitwise operations below to fail + # so convert to bytearray. + if six.PY2: + data = bytearray(bytes(data)) + + length = len(data) + seed = 0x9747b28c + # 'm' and 'r' are mixing constants generated offline. + # They're not really 'magic', they just happen to work well. + m = 0x5bd1e995 + r = 24 + + # Initialize the hash to a random value + h = seed ^ length + length4 = length // 4 + + for i in range(length4): + i4 = i * 4 + k = ((data[i4 + 0] & 0xff) + + ((data[i4 + 1] & 0xff) << 8) + + ((data[i4 + 2] & 0xff) << 16) + + ((data[i4 + 3] & 0xff) << 24)) + k &= 0xffffffff + k *= m + k &= 0xffffffff + k ^= (k % 0x100000000) >> r # k ^= k >>> r + k &= 0xffffffff + k *= m + k &= 0xffffffff + + h *= m + h &= 0xffffffff + h ^= k + h &= 0xffffffff + + # Handle the last few bytes of the input array + extra_bytes = length % 4 + if extra_bytes >= 3: + h ^= (data[(length & ~3) + 2] & 0xff) << 16 + h &= 0xffffffff + if extra_bytes >= 2: + h ^= (data[(length & ~3) + 1] & 0xff) << 8 + h &= 0xffffffff + if extra_bytes >= 1: + h ^= (data[length & ~3] & 0xff) + h &= 0xffffffff + h *= m + h &= 0xffffffff + + h ^= (h % 0x100000000) >> 13 # h >>> 13; + h &= 0xffffffff + h *= m + h &= 0xffffffff + h ^= (h % 0x100000000) >> 15 # h >>> 15; + h &= 0xffffffff + + return h diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py deleted file mode 100644 index be92daf..0000000 --- a/kafka/partitioner/hashed.py +++ /dev/null @@ -1,118 +0,0 @@ -from __future__ import absolute_import - -from kafka.vendor import six - -from kafka.partitioner.base import Partitioner - - -class Murmur2Partitioner(Partitioner): - """ - Implements a partitioner which selects the target partition based on - the hash of the key. Attempts to apply the same hashing - function as mainline java client. - """ - def __call__(self, key, partitions=None, available=None): - if available: - return self.partition(key, available) - return self.partition(key, partitions) - - def partition(self, key, partitions=None): - if not partitions: - partitions = self.partitions - - # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 - idx = (murmur2(key) & 0x7fffffff) % len(partitions) - - return partitions[idx] - - -class LegacyPartitioner(object): - """DEPRECATED -- See Issue 374 - - Implements a partitioner which selects the target partition based on - the hash of the key - """ - def __init__(self, partitions): - self.partitions = partitions - - def partition(self, key, partitions=None): - if not partitions: - partitions = self.partitions - size = len(partitions) - idx = hash(key) % size - - return partitions[idx] - - -# Default will change to Murmur2 in 0.10 release -HashedPartitioner = LegacyPartitioner - - -# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 -def murmur2(data): - """Pure-python Murmur2 implementation. - - Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 - - Args: - data (bytes): opaque bytes - - Returns: MurmurHash2 of data - """ - # Python2 bytes is really a str, causing the bitwise operations below to fail - # so convert to bytearray. - if six.PY2: - data = bytearray(bytes(data)) - - length = len(data) - seed = 0x9747b28c - # 'm' and 'r' are mixing constants generated offline. - # They're not really 'magic', they just happen to work well. - m = 0x5bd1e995 - r = 24 - - # Initialize the hash to a random value - h = seed ^ length - length4 = length // 4 - - for i in range(length4): - i4 = i * 4 - k = ((data[i4 + 0] & 0xff) + - ((data[i4 + 1] & 0xff) << 8) + - ((data[i4 + 2] & 0xff) << 16) + - ((data[i4 + 3] & 0xff) << 24)) - k &= 0xffffffff - k *= m - k &= 0xffffffff - k ^= (k % 0x100000000) >> r # k ^= k >>> r - k &= 0xffffffff - k *= m - k &= 0xffffffff - - h *= m - h &= 0xffffffff - h ^= k - h &= 0xffffffff - - # Handle the last few bytes of the input array - extra_bytes = length % 4 - if extra_bytes >= 3: - h ^= (data[(length & ~3) + 2] & 0xff) << 16 - h &= 0xffffffff - if extra_bytes >= 2: - h ^= (data[(length & ~3) + 1] & 0xff) << 8 - h &= 0xffffffff - if extra_bytes >= 1: - h ^= (data[length & ~3] & 0xff) - h &= 0xffffffff - h *= m - h &= 0xffffffff - - h ^= (h % 0x100000000) >> 13 # h >>> 13; - h &= 0xffffffff - h *= m - h &= 0xffffffff - h ^= (h % 0x100000000) >> 15 # h >>> 15; - h &= 0xffffffff - - return h diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py deleted file mode 100644 index e68c372..0000000 --- a/kafka/partitioner/roundrobin.py +++ /dev/null @@ -1,70 +0,0 @@ -from __future__ import absolute_import - -from kafka.partitioner.base import Partitioner - - -class RoundRobinPartitioner(Partitioner): - def __init__(self, partitions=None): - self.partitions_iterable = CachedPartitionCycler(partitions) - if partitions: - self._set_partitions(partitions) - else: - self.partitions = None - - def __call__(self, key, all_partitions=None, available_partitions=None): - if available_partitions: - cur_partitions = available_partitions - else: - cur_partitions = all_partitions - if not self.partitions: - self._set_partitions(cur_partitions) - elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None: - self._set_partitions(cur_partitions) - return next(self.partitions_iterable) - - def _set_partitions(self, available_partitions): - self.partitions = available_partitions - self.partitions_iterable.set_partitions(available_partitions) - - def partition(self, key, all_partitions=None, available_partitions=None): - return self.__call__(key, all_partitions, available_partitions) - - -class CachedPartitionCycler(object): - def __init__(self, partitions=None): - self.partitions = partitions - if partitions: - assert type(partitions) is list - self.cur_pos = None - - def __next__(self): - return self.next() - - @staticmethod - def _index_available(cur_pos, partitions): - return cur_pos < len(partitions) - - def set_partitions(self, partitions): - if self.cur_pos: - if not self._index_available(self.cur_pos, partitions): - self.cur_pos = 0 - self.partitions = partitions - return None - - self.partitions = partitions - next_item = self.partitions[self.cur_pos] - if next_item in partitions: - self.cur_pos = partitions.index(next_item) - else: - self.cur_pos = 0 - return None - self.partitions = partitions - - def next(self): - assert self.partitions is not None - if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions): - self.cur_pos = 1 - return self.partitions[0] - cur_item = self.partitions[self.cur_pos] - self.cur_pos += 1 - return cur_item diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py index 54fd8d2..576c772 100644 --- a/kafka/producer/__init__.py +++ b/kafka/producer/__init__.py @@ -1,10 +1,7 @@ from __future__ import absolute_import from kafka.producer.kafka import KafkaProducer -from kafka.producer.simple import SimpleProducer -from kafka.producer.keyed import KeyedProducer __all__ = [ - 'KafkaProducer', - 'SimpleProducer', 'KeyedProducer' # deprecated + 'KafkaProducer' ] diff --git a/kafka/producer/base.py b/kafka/producer/base.py deleted file mode 100644 index b323966..0000000 --- a/kafka/producer/base.py +++ /dev/null @@ -1,482 +0,0 @@ -from __future__ import absolute_import - -import atexit -import logging -import time - -try: - from queue import Empty, Full, Queue # pylint: disable=import-error -except ImportError: - from Queue import Empty, Full, Queue # pylint: disable=import-error -from collections import defaultdict - -from threading import Thread, Event - -from kafka.vendor import six - -from kafka.errors import ( - kafka_errors, UnsupportedCodecError, FailedPayloadsError, - RequestTimedOutError, AsyncProducerQueueFull, UnknownError, - RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) -from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set -from kafka.structs import ( - ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions) - -log = logging.getLogger('kafka.producer') - -BATCH_SEND_DEFAULT_INTERVAL = 20 -BATCH_SEND_MSG_COUNT = 20 - -# unlimited -ASYNC_QUEUE_MAXSIZE = 0 -ASYNC_QUEUE_PUT_TIMEOUT = 0 -# unlimited retries by default -ASYNC_RETRY_LIMIT = None -ASYNC_RETRY_BACKOFF_MS = 100 -ASYNC_RETRY_ON_TIMEOUTS = True -ASYNC_LOG_MESSAGES_ON_ERROR = True - -STOP_ASYNC_PRODUCER = -1 -ASYNC_STOP_TIMEOUT_SECS = 30 - -SYNC_FAIL_ON_ERROR_DEFAULT = True - - -def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, retry_options, stop_event, - log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - stop_timeout=ASYNC_STOP_TIMEOUT_SECS, - codec_compresslevel=None): - """Private method to manage producing messages asynchronously - - Listens on the queue for a specified number of messages or until - a specified timeout and then sends messages to the brokers in grouped - requests (one per broker). - - Messages placed on the queue should be tuples that conform to this format: - ((topic, partition), message, key) - - Currently does not mark messages with task_done. Do not attempt to - :meth:`join`! - - Arguments: - queue (threading.Queue): the queue from which to get messages - client (kafka.SimpleClient): instance to use for communicating - with brokers - codec (kafka.protocol.ALL_CODECS): compression codec to use - batch_time (int): interval in seconds to send message batches - batch_size (int): count of messages that will trigger an immediate send - req_acks: required acks to use with ProduceRequests. see server protocol - ack_timeout: timeout to wait for required acks. see server protocol - retry_options (RetryOptions): settings for retry limits, backoff etc - stop_event (threading.Event): event to monitor for shutdown signal. - when this event is 'set', the producer will stop sending messages. - log_messages_on_error (bool, optional): log stringified message-contents - on any produce error, otherwise only log a hash() of the contents, - defaults to True. - stop_timeout (int or float, optional): number of seconds to continue - retrying messages after stop_event is set, defaults to 30. - """ - request_tries = {} - - while not stop_event.is_set(): - try: - client.reinit() - except Exception as e: - log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) - time.sleep(float(retry_options.backoff_ms) / 1000) - else: - break - - stop_at = None - while not (stop_event.is_set() and queue.empty() and not request_tries): - - # Handle stop_timeout - if stop_event.is_set(): - if not stop_at: - stop_at = stop_timeout + time.time() - if time.time() > stop_at: - log.debug('Async producer stopping due to stop_timeout') - break - - timeout = batch_time - count = batch_size - send_at = time.time() + timeout - msgset = defaultdict(list) - - # Merging messages will require a bit more work to manage correctly - # for now, don't look for new batches if we have old ones to retry - if request_tries: - count = 0 - log.debug('Skipping new batch collection to handle retries') - else: - log.debug('Batching size: %s, timeout: %s', count, timeout) - - # Keep fetching till we gather enough messages or a - # timeout is reached - while count > 0 and timeout >= 0: - try: - topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: - break - - # Check if the controller has requested us to stop - if topic_partition == STOP_ASYNC_PRODUCER: - stop_event.set() - break - - # Adjust the timeout to match the remaining period - count -= 1 - timeout = send_at - time.time() - msgset[topic_partition].append((msg, key)) - - # Send collected requests upstream - for topic_partition, msg in msgset.items(): - messages = create_message_set(msg, codec, key, codec_compresslevel) - req = ProduceRequestPayload( - topic_partition.topic, - topic_partition.partition, - tuple(messages)) - request_tries[req] = 0 - - if not request_tries: - continue - - reqs_to_retry, error_cls = [], None - retry_state = { - 'do_backoff': False, - 'do_refresh': False - } - - def _handle_error(error_cls, request): - if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)): - reqs_to_retry.append(request) - if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES): - retry_state['do_backoff'] |= True - if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES): - retry_state['do_refresh'] |= True - - requests = list(request_tries.keys()) - log.debug('Sending: %s', requests) - responses = client.send_produce_request(requests, - acks=req_acks, - timeout=ack_timeout, - fail_on_error=False) - - log.debug('Received: %s', responses) - for i, response in enumerate(responses): - error_cls = None - if isinstance(response, FailedPayloadsError): - error_cls = response.__class__ - orig_req = response.payload - - elif isinstance(response, ProduceResponsePayload) and response.error: - error_cls = kafka_errors.get(response.error, UnknownError) - orig_req = requests[i] - - if error_cls: - _handle_error(error_cls, orig_req) - log.error('%s sending ProduceRequestPayload (#%d of %d) ' - 'to %s:%d with msgs %s', - error_cls.__name__, (i + 1), len(requests), - orig_req.topic, orig_req.partition, - orig_req.messages if log_messages_on_error - else hash(orig_req.messages)) - - if not reqs_to_retry: - request_tries = {} - continue - - # doing backoff before next retry - if retry_state['do_backoff'] and retry_options.backoff_ms: - log.warning('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms) - time.sleep(float(retry_options.backoff_ms) / 1000) - - # refresh topic metadata before next retry - if retry_state['do_refresh']: - log.warning('Async producer forcing metadata refresh metadata before retrying') - try: - client.load_metadata_for_topics() - except Exception: - log.exception("Async producer couldn't reload topic metadata.") - - # Apply retry limit, dropping messages that are over - request_tries = dict( - (key, count + 1) - for (key, count) in request_tries.items() - if key in reqs_to_retry - and (retry_options.limit is None - or (count < retry_options.limit)) - ) - - # Log messages we are going to retry - for orig_req in request_tries.keys(): - log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s', - orig_req.topic, orig_req.partition, - orig_req.messages if log_messages_on_error - else hash(orig_req.messages)) - - if request_tries or not queue.empty(): - log.error('Stopped producer with %d unsent messages', len(request_tries) + queue.qsize()) - - -class Producer(object): - """ - Base class to be used by producers - - Arguments: - client (kafka.SimpleClient): instance to use for broker - communications. If async_send=True, the background thread will use - :meth:`client.copy`, which is expected to return a thread-safe - object. - codec (kafka.protocol.ALL_CODECS): compression codec to use. - req_acks (int, optional): A value indicating the acknowledgements that - the server must receive before responding to the request, - defaults to 1 (local ack). - ack_timeout (int, optional): millisecond timeout to wait for the - configured req_acks, defaults to 1000. - sync_fail_on_error (bool, optional): whether sync producer should - raise exceptions (True), or just return errors (False), - defaults to True. - async_send (bool, optional): send message using a background thread, - defaults to False. - batch_send_every_n (int, optional): If async_send is True, messages are - sent in batches of this size, defaults to 20. - batch_send_every_t (int or float, optional): If async_send is True, - messages are sent immediately after this timeout in seconds, even - if there are fewer than batch_send_every_n, defaults to 20. - async_retry_limit (int, optional): number of retries for failed messages - or None for unlimited, defaults to None / unlimited. - async_retry_backoff_ms (int, optional): milliseconds to backoff on - failed messages, defaults to 100. - async_retry_on_timeouts (bool, optional): whether to retry on - RequestTimedOutError, defaults to True. - async_queue_maxsize (int, optional): limit to the size of the - internal message queue in number of messages (not size), defaults - to 0 (no limit). - async_queue_put_timeout (int or float, optional): timeout seconds - for queue.put in send_messages for async producers -- will only - apply if async_queue_maxsize > 0 and the queue is Full, - defaults to 0 (fail immediately on full queue). - async_log_messages_on_error (bool, optional): set to False and the - async producer will only log hash() contents on failed produce - requests, defaults to True (log full messages). Hash logging - will not allow you to identify the specific message that failed, - but it will allow you to match failures with retries. - async_stop_timeout (int or float, optional): seconds to continue - attempting to send queued messages after :meth:`producer.stop`, - defaults to 30. - - Deprecated Arguments: - async (bool, optional): send message using a background thread, - defaults to False. Deprecated, use 'async_send' - batch_send (bool, optional): If True, messages are sent by a background - thread in batches, defaults to False. Deprecated, use 'async_send' - """ - ACK_NOT_REQUIRED = 0 # No ack is required - ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log - ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed - DEFAULT_ACK_TIMEOUT = 1000 - - def __init__(self, client, - req_acks=ACK_AFTER_LOCAL_WRITE, - ack_timeout=DEFAULT_ACK_TIMEOUT, - codec=None, - codec_compresslevel=None, - sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, - async_send=False, - batch_send=False, # deprecated, use async_send - batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_limit=ASYNC_RETRY_LIMIT, - async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, - async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, - async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, - async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS, - **kwargs): - - # async renamed async_send for python3.7 support - if 'async' in kwargs: - log.warning('Deprecated async option found -- use async_send') - async_send = kwargs['async'] - - if async_send: - assert batch_send_every_n > 0 - assert batch_send_every_t > 0 - assert async_queue_maxsize >= 0 - - self.client = client - self.async_send = async_send - self.req_acks = req_acks - self.ack_timeout = ack_timeout - self.stopped = False - - if codec is None: - codec = CODEC_NONE - elif codec not in ALL_CODECS: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) - - self.codec = codec - self.codec_compresslevel = codec_compresslevel - - if self.async_send: - # Messages are sent through this queue - self.queue = Queue(async_queue_maxsize) - self.async_queue_put_timeout = async_queue_put_timeout - async_retry_options = RetryOptions( - limit=async_retry_limit, - backoff_ms=async_retry_backoff_ms, - retry_on_timeouts=async_retry_on_timeouts) - self.thread_stop_event = Event() - self.thread = Thread( - target=_send_upstream, - args=(self.queue, self.client.copy(), self.codec, - batch_send_every_t, batch_send_every_n, - self.req_acks, self.ack_timeout, - async_retry_options, self.thread_stop_event), - kwargs={'log_messages_on_error': async_log_messages_on_error, - 'stop_timeout': async_stop_timeout, - 'codec_compresslevel': self.codec_compresslevel} - ) - - # Thread will die if main thread exits - self.thread.daemon = True - self.thread.start() - - def cleanup(obj): - if not obj.stopped: - obj.stop() - self._cleanup_func = cleanup - atexit.register(cleanup, self) - else: - self.sync_fail_on_error = sync_fail_on_error - - def send_messages(self, topic, partition, *msg): - """Helper method to send produce requests. - - Note that msg type *must* be encoded to bytes by user. Passing unicode - message will not work, for example you should encode before calling - send_messages via something like `unicode_message.encode('utf-8')` - All messages will set the message 'key' to None. - - Arguments: - topic (str): name of topic for produce request - partition (int): partition number for produce request - *msg (bytes): one or more message payloads - - Returns: - ResponseRequest returned by server - - Raises: - FailedPayloadsError: low-level connection error, can be caused by - networking failures, or a malformed request. - KafkaUnavailableError: all known brokers are down when attempting - to refresh metadata. - LeaderNotAvailableError: topic or partition is initializing or - a broker failed and leadership election is in progress. - NotLeaderForPartitionError: metadata is out of sync; the broker - that the request was sent to is not the leader for the topic - or partition. - UnknownTopicOrPartitionError: the topic or partition has not - been created yet and auto-creation is not available. - AsyncProducerQueueFull: in async mode, if too many messages are - unsent and remain in the internal queue. - """ - return self._send_messages(topic, partition, *msg) - - def _send_messages(self, topic, partition, *msg, **kwargs): - key = kwargs.pop('key', None) - - # Guarantee that msg is actually a list or tuple (should always be true) - if not isinstance(msg, (list, tuple)): - raise TypeError("msg is not a list or tuple!") - - for m in msg: - # The protocol allows to have key & payload with null values both, - # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense. - if m is None: - if key is None: - raise TypeError("key and payload can't be null in one") - # Raise TypeError if any non-null message is not encoded as bytes - elif not isinstance(m, six.binary_type): - raise TypeError("all produce message payloads must be null or type bytes") - - # Raise TypeError if the key is not encoded as bytes - if key is not None and not isinstance(key, six.binary_type): - raise TypeError("the key must be type bytes") - - if self.async_send: - for idx, m in enumerate(msg): - try: - item = (TopicPartition(topic, partition), m, key) - if self.async_queue_put_timeout == 0: - self.queue.put_nowait(item) - else: - self.queue.put(item, True, self.async_queue_put_timeout) - except Full: - raise AsyncProducerQueueFull( - msg[idx:], - 'Producer async queue overfilled. ' - 'Current queue size %d.' % (self.queue.qsize(),)) - resp = [] - else: - messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel) - req = ProduceRequestPayload(topic, partition, messages) - try: - resp = self.client.send_produce_request( - [req], acks=self.req_acks, timeout=self.ack_timeout, - fail_on_error=self.sync_fail_on_error - ) - except Exception: - log.exception("Unable to send messages") - raise - return resp - - def stop(self, timeout=None): - """ - Stop the producer (async mode). Blocks until async thread completes. - """ - if timeout is not None: - log.warning('timeout argument to stop() is deprecated - ' - 'it will be removed in future release') - - if not self.async_send: - log.warning('producer.stop() called, but producer is not async') - return - - if self.stopped: - log.warning('producer.stop() called, but producer is already stopped') - return - - if self.async_send: - self.queue.put((STOP_ASYNC_PRODUCER, None, None)) - self.thread_stop_event.set() - self.thread.join() - - if hasattr(self, '_cleanup_func'): - # Remove cleanup handler now that we've stopped - - # py3 supports unregistering - if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup_func) # pylint: disable=no-member - - # py2 requires removing from private attribute... - else: - - # ValueError on list.remove() if the exithandler no longer exists - # but that is fine here - try: - atexit._exithandlers.remove( # pylint: disable=no-member - (self._cleanup_func, (self,), {})) - except ValueError: - pass - - del self._cleanup_func - - self.stopped = True - - def __del__(self): - if self.async_send and not self.stopped: - self.stop() diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py deleted file mode 100644 index 3ba9216..0000000 --- a/kafka/producer/keyed.py +++ /dev/null @@ -1,49 +0,0 @@ -from __future__ import absolute_import - -import logging -import warnings - -from kafka.producer.base import Producer -from kafka.partitioner import HashedPartitioner - - -log = logging.getLogger(__name__) - - -class KeyedProducer(Producer): - """ - A producer which distributes messages to partitions based on the key - - See Producer class for Arguments - - Additional Arguments: - partitioner: A partitioner class that will be used to get the partition - to send the message to. Must be derived from Partitioner. - Defaults to HashedPartitioner. - """ - def __init__(self, *args, **kwargs): - self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner) - self.partitioners = {} - super(KeyedProducer, self).__init__(*args, **kwargs) - - def _next_partition(self, topic, key): - if topic not in self.partitioners: - if not self.client.has_metadata_for_topic(topic): - self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True) - - self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic)) - - partitioner = self.partitioners[topic] - return partitioner.partition(key) - - def send_messages(self, topic, key, *msg): - partition = self._next_partition(topic, key) - return self._send_messages(topic, partition, *msg, key=key) - - # DEPRECATED - def send(self, topic, key, msg): - warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning) - return self.send_messages(topic, key, msg) - - def __repr__(self): - return '<KeyedProducer batch=%s>' % (self.async_send,) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py deleted file mode 100644 index f334a49..0000000 --- a/kafka/producer/simple.py +++ /dev/null @@ -1,54 +0,0 @@ -from __future__ import absolute_import - -from itertools import cycle -import logging -import random - -from kafka.vendor.six.moves import range - -from kafka.producer.base import Producer - - -log = logging.getLogger(__name__) - - -class SimpleProducer(Producer): - """A simple, round-robin producer. - - See Producer class for Base Arguments - - Additional Arguments: - random_start (bool, optional): randomize the initial partition which - the first message block will be published to, otherwise - if false, the first message block will always publish - to partition 0 before cycling through each partition, - defaults to True. - """ - def __init__(self, *args, **kwargs): - self.partition_cycles = {} - self.random_start = kwargs.pop('random_start', True) - super(SimpleProducer, self).__init__(*args, **kwargs) - - def _next_partition(self, topic): - if topic not in self.partition_cycles: - if not self.client.has_metadata_for_topic(topic): - self.client.ensure_topic_exists(topic) - - self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic)) - - # Randomize the initial partition that is returned - if self.random_start: - num_partitions = len(self.client.get_partition_ids_for_topic(topic)) - for _ in range(random.randint(0, num_partitions-1)): - next(self.partition_cycles[topic]) - - return next(self.partition_cycles[topic]) - - def send_messages(self, topic, *msg): - partition = self._next_partition(topic) - return super(SimpleProducer, self).send_messages( - topic, partition, *msg - ) - - def __repr__(self): - return '<SimpleProducer batch=%s>' % (self.async_send,) diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py index 8cf5640..26dcc78 100644 --- a/kafka/protocol/__init__.py +++ b/kafka/protocol/__init__.py @@ -1,11 +1,5 @@ from __future__ import absolute_import -from kafka.protocol.legacy import ( - create_message, create_gzip_message, - create_snappy_message, create_message_set, - CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS, - ATTRIBUTE_CODEC_MASK, KafkaProtocol, -) API_KEYS = { 0: 'Produce', diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py deleted file mode 100644 index 2e8f5bc..0000000 --- a/kafka/protocol/legacy.py +++ /dev/null @@ -1,474 +0,0 @@ -from __future__ import absolute_import - -import logging -import struct - -from kafka.vendor import six # pylint: disable=import-error - -import kafka.protocol.commit -import kafka.protocol.fetch -import kafka.protocol.message -import kafka.protocol.metadata -import kafka.protocol.offset -import kafka.protocol.produce -import kafka.structs - -from kafka.codec import gzip_encode, snappy_encode -from kafka.errors import ProtocolError, UnsupportedCodecError -from kafka.util import ( - crc32, read_short_string, relative_unpack, - write_int_string, group_by_topic_and_partition) -from kafka.protocol.message import MessageSet - - -log = logging.getLogger(__name__) - -ATTRIBUTE_CODEC_MASK = 0x03 -CODEC_NONE = 0x00 -CODEC_GZIP = 0x01 -CODEC_SNAPPY = 0x02 -ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY) - - -class KafkaProtocol(object): - """ - Class to encapsulate all of the protocol encoding/decoding. - This class does not have any state associated with it, it is purely - for organization. - """ - PRODUCE_KEY = 0 - FETCH_KEY = 1 - OFFSET_KEY = 2 - METADATA_KEY = 3 - OFFSET_COMMIT_KEY = 8 - OFFSET_FETCH_KEY = 9 - CONSUMER_METADATA_KEY = 10 - - ################### - # Private API # - ################### - - @classmethod - def _encode_message_header(cls, client_id, correlation_id, request_key, - version=0): - """ - Encode the common request envelope - """ - return struct.pack('>hhih%ds' % len(client_id), - request_key, # ApiKey - version, # ApiVersion - correlation_id, # CorrelationId - len(client_id), # ClientId size - client_id) # ClientId - - @classmethod - def _encode_message_set(cls, messages): - """ - Encode a MessageSet. Unlike other arrays in the protocol, - MessageSets are not length-prefixed - - Format - ====== - MessageSet => [Offset MessageSize Message] - Offset => int64 - MessageSize => int32 - """ - message_set = [] - for message in messages: - encoded_message = KafkaProtocol._encode_message(message) - message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0, - len(encoded_message), - encoded_message)) - return b''.join(message_set) - - @classmethod - def _encode_message(cls, message): - """ - Encode a single message. - - The magic number of a message is a format version number. - The only supported magic number right now is zero - - Format - ====== - Message => Crc MagicByte Attributes Key Value - Crc => int32 - MagicByte => int8 - Attributes => int8 - Key => bytes - Value => bytes - """ - if message.magic == 0: - msg = b''.join([ - struct.pack('>BB', message.magic, message.attributes), - write_int_string(message.key), - write_int_string(message.value) - ]) - crc = crc32(msg) - msg = struct.pack('>i%ds' % len(msg), crc, msg) - else: - raise ProtocolError("Unexpected magic number: %d" % message.magic) - return msg - - ################## - # Public API # - ################## - - @classmethod - def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): - """ - Encode a ProduceRequest struct - - Arguments: - payloads: list of ProduceRequestPayload - acks: How "acky" you want the request to be - 1: written to disk by the leader - 0: immediate response - -1: waits for all replicas to be in sync - timeout: Maximum time (in ms) the server will wait for replica acks. - This is _not_ a socket timeout - - Returns: ProduceRequest - """ - if acks not in (1, 0, -1): - raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) - - topics = [] - for topic, topic_payloads in group_by_topic_and_partition(payloads).items(): - topic_msgs = [] - for partition, payload in topic_payloads.items(): - partition_msgs = [] - for msg in payload.messages: - m = kafka.protocol.message.Message( - msg.value, key=msg.key, - magic=msg.magic, attributes=msg.attributes - ) - partition_msgs.append((0, m.encode())) - topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False))) - topics.append((topic, topic_msgs)) - - - return kafka.protocol.produce.ProduceRequest[0]( - required_acks=acks, - timeout=timeout, - topics=topics - ) - - @classmethod - def decode_produce_response(cls, response): - """ - Decode ProduceResponse to ProduceResponsePayload - - Arguments: - response: ProduceResponse - - Return: list of ProduceResponsePayload - """ - return [ - kafka.structs.ProduceResponsePayload(topic, partition, error, offset) - for topic, partitions in response.topics - for partition, error, offset in partitions - ] - - @classmethod - def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096): - """ - Encodes a FetchRequest struct - - Arguments: - payloads: list of FetchRequestPayload - max_wait_time (int, optional): ms to block waiting for min_bytes - data. Defaults to 100. - min_bytes (int, optional): minimum bytes required to return before - max_wait_time. Defaults to 4096. - - Return: FetchRequest - """ - return kafka.protocol.fetch.FetchRequest[0]( - replica_id=-1, - max_wait_time=max_wait_time, - min_bytes=min_bytes, - topics=[( - topic, - [( - partition, - payload.offset, - payload.max_bytes) - for partition, payload in topic_payloads.items()]) - for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) - - @classmethod - def decode_fetch_response(cls, response): - """ - Decode FetchResponse struct to FetchResponsePayloads - - Arguments: - response: FetchResponse - """ - return [ - kafka.structs.FetchResponsePayload( - topic, partition, error, highwater_offset, [ - offset_and_msg - for offset_and_msg in cls.decode_message_set(messages)]) - for topic, partitions in response.topics - for partition, error, highwater_offset, messages in partitions - ] - - @classmethod - def decode_message_set(cls, raw_data): - messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data)) - for offset, _, message in messages: - if isinstance(message, kafka.protocol.message.Message) and message.is_compressed(): - inner_messages = message.decompress() - for (inner_offset, _msg_size, inner_msg) in inner_messages: - yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg) - else: - yield kafka.structs.OffsetAndMessage(offset, message) - - @classmethod - def encode_offset_request(cls, payloads=()): - return kafka.protocol.offset.OffsetRequest[0]( - replica_id=-1, - topics=[( - topic, - [( - partition, - payload.time, - payload.max_offsets) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - - @classmethod - def decode_offset_response(cls, response): - """ - Decode OffsetResponse into OffsetResponsePayloads - - Arguments: - response: OffsetResponse - - Returns: list of OffsetResponsePayloads - """ - return [ - kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets)) - for topic, partitions in response.topics - for partition, error, offsets in partitions - ] - - @classmethod - def encode_list_offset_request(cls, payloads=()): - return kafka.protocol.offset.OffsetRequest[1]( - replica_id=-1, - topics=[( - topic, - [( - partition, - payload.time) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - - @classmethod - def decode_list_offset_response(cls, response): - """ - Decode OffsetResponse_v2 into ListOffsetResponsePayloads - - Arguments: - response: OffsetResponse_v2 - - Returns: list of ListOffsetResponsePayloads - """ - return [ - kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset) - for topic, partitions in response.topics - for partition, error, timestamp, offset in partitions - ] - - - @classmethod - def encode_metadata_request(cls, topics=(), payloads=None): - """ - Encode a MetadataRequest - - Arguments: - topics: list of strings - """ - if payloads is not None: - topics = payloads - - return kafka.protocol.metadata.MetadataRequest[0](topics) - - @classmethod - def decode_metadata_response(cls, response): - return response - - @classmethod - def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): - """ - Encode a ConsumerMetadataRequest - - Arguments: - client_id: string - correlation_id: int - payloads: string (consumer group) - """ - message = [] - message.append(cls._encode_message_header(client_id, correlation_id, - KafkaProtocol.CONSUMER_METADATA_KEY)) - message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) - - msg = b''.join(message) - return write_int_string(msg) - - @classmethod - def decode_consumer_metadata_response(cls, data): - """ - Decode bytes to a kafka.structs.ConsumerMetadataResponse - - Arguments: - data: bytes to decode - """ - ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) - (host, cur) = read_short_string(data, cur) - ((port,), cur) = relative_unpack('>i', data, cur) - - return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port) - - @classmethod - def encode_offset_commit_request(cls, group, payloads): - """ - Encode an OffsetCommitRequest struct - - Arguments: - group: string, the consumer group you are committing offsets for - payloads: list of OffsetCommitRequestPayload - """ - return kafka.protocol.commit.OffsetCommitRequest[0]( - consumer_group=group, - topics=[( - topic, - [( - partition, - payload.offset, - payload.metadata) - for partition, payload in six.iteritems(topic_payloads)]) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - - @classmethod - def decode_offset_commit_response(cls, response): - """ - Decode OffsetCommitResponse to an OffsetCommitResponsePayload - - Arguments: - response: OffsetCommitResponse - """ - return [ - kafka.structs.OffsetCommitResponsePayload(topic, partition, error) - for topic, partitions in response.topics - for partition, error in partitions - ] - - @classmethod - def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): - """ - Encode an OffsetFetchRequest struct. The request is encoded using - version 0 if from_kafka is false, indicating a request for Zookeeper - offsets. It is encoded using version 1 otherwise, indicating a request - for Kafka offsets. - - Arguments: - group: string, the consumer group you are fetching offsets for - payloads: list of OffsetFetchRequestPayload - from_kafka: bool, default False, set True for Kafka-committed offsets - """ - version = 1 if from_kafka else 0 - return kafka.protocol.commit.OffsetFetchRequest[version]( - consumer_group=group, - topics=[( - topic, - list(topic_payloads.keys())) - for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - - @classmethod - def decode_offset_fetch_response(cls, response): - """ - Decode OffsetFetchResponse to OffsetFetchResponsePayloads - - Arguments: - response: OffsetFetchResponse - """ - return [ - kafka.structs.OffsetFetchResponsePayload( - topic, partition, offset, metadata, error - ) - for topic, partitions in response.topics - for partition, offset, metadata, error in partitions - ] - - -def create_message(payload, key=None): - """ - Construct a Message - - Arguments: - payload: bytes, the payload to send to Kafka - key: bytes, a key used for partition routing (optional) - - """ - return kafka.structs.Message(0, 0, key, payload) - - -def create_gzip_message(payloads, key=None, compresslevel=None): - """ - Construct a Gzipped Message containing multiple Messages - - The given payloads will be encoded, compressed, and sent as a single atomic - message to Kafka. - - Arguments: - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) - - """ - message_set = KafkaProtocol._encode_message_set( - [create_message(payload, pl_key) for payload, pl_key in payloads]) - - gzipped = gzip_encode(message_set, compresslevel=compresslevel) - codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP - - return kafka.structs.Message(0, 0x00 | codec, key, gzipped) - - -def create_snappy_message(payloads, key=None): - """ - Construct a Snappy Message containing multiple Messages - - The given payloads will be encoded, compressed, and sent as a single atomic - message to Kafka. - - Arguments: - payloads: list(bytes), a list of payload to send be sent to Kafka - key: bytes, a key used for partition routing (optional) - - """ - message_set = KafkaProtocol._encode_message_set( - [create_message(payload, pl_key) for payload, pl_key in payloads]) - - snapped = snappy_encode(message_set) - codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY - - return kafka.structs.Message(0, 0x00 | codec, key, snapped) - - -def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): - """Create a message set using the given codec. - - If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, - return a list containing a single codec-encoded message. - """ - if codec == CODEC_NONE: - return [create_message(m, k) for m, k in messages] - elif codec == CODEC_GZIP: - return [create_gzip_message(messages, key, compresslevel)] - elif codec == CODEC_SNAPPY: - return [create_snappy_message(messages, key)] - else: - raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,)) diff --git a/kafka/structs.py b/kafka/structs.py index baacbcd..9ab4f8b 100644 --- a/kafka/structs.py +++ b/kafka/structs.py @@ -3,64 +3,6 @@ from __future__ import absolute_import from collections import namedtuple -# SimpleClient Payload Structs - Deprecated - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI -MetadataRequest = namedtuple("MetadataRequest", - ["topics"]) - -MetadataResponse = namedtuple("MetadataResponse", - ["brokers", "topics"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest -ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest", - ["groups"]) - -ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse", - ["error", "nodeId", "host", "port"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -ProduceRequestPayload = namedtuple("ProduceRequestPayload", - ["topic", "partition", "messages"]) - -ProduceResponsePayload = namedtuple("ProduceResponsePayload", - ["topic", "partition", "error", "offset"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI -FetchRequestPayload = namedtuple("FetchRequestPayload", - ["topic", "partition", "offset", "max_bytes"]) - -FetchResponsePayload = namedtuple("FetchResponsePayload", - ["topic", "partition", "error", "highwaterMark", "messages"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI -OffsetRequestPayload = namedtuple("OffsetRequestPayload", - ["topic", "partition", "time", "max_offsets"]) - -ListOffsetRequestPayload = namedtuple("ListOffsetRequestPayload", - ["topic", "partition", "time"]) - -OffsetResponsePayload = namedtuple("OffsetResponsePayload", - ["topic", "partition", "error", "offsets"]) - -ListOffsetResponsePayload = namedtuple("ListOffsetResponsePayload", - ["topic", "partition", "error", "timestamp", "offset"]) - -# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI -OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload", - ["topic", "partition", "offset", "metadata"]) - -OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload", - ["topic", "partition", "error"]) - -OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload", - ["topic", "partition"]) - -OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload", - ["topic", "partition", "offset", "metadata", "error"]) - - - # Other useful structs TopicPartition = namedtuple("TopicPartition", ["topic", "partition"]) @@ -79,17 +21,6 @@ OffsetAndTimestamp = namedtuple("OffsetAndTimestamp", ["offset", "timestamp"]) -# Deprecated structs -OffsetAndMessage = namedtuple("OffsetAndMessage", - ["offset", "message"]) - -Message = namedtuple("Message", - ["magic", "attributes", "key", "value"]) - -KafkaMessage = namedtuple("KafkaMessage", - ["topic", "partition", "offset", "key", "value"]) - - # Define retry policy for async producer # Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", diff --git a/kafka/util.py b/kafka/util.py index 9354bd9..9f65b81 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -2,15 +2,10 @@ from __future__ import absolute_import import atexit import binascii -import collections -import struct -from threading import Thread, Event import weakref from kafka.vendor import six -from kafka.errors import BufferUnderflowError - if six.PY3: MAX_INT = 2 ** 31 @@ -28,109 +23,6 @@ else: from binascii import crc32 -def write_int_string(s): - if s is not None and not isinstance(s, six.binary_type): - raise TypeError('Expected "%s" to be bytes\n' - 'data=%s' % (type(s), repr(s))) - if s is None: - return struct.pack('>i', -1) - else: - return struct.pack('>i%ds' % len(s), len(s), s) - - -def read_short_string(data, cur): - if len(data) < cur + 2: - raise BufferUnderflowError("Not enough data left") - - (strlen,) = struct.unpack('>h', data[cur:cur + 2]) - if strlen == -1: - return None, cur + 2 - - cur += 2 - if len(data) < cur + strlen: - raise BufferUnderflowError("Not enough data left") - - out = data[cur:cur + strlen] - return out, cur + strlen - - -def relative_unpack(fmt, data, cur): - size = struct.calcsize(fmt) - if len(data) < cur + size: - raise BufferUnderflowError("Not enough data left") - - out = struct.unpack(fmt, data[cur:cur + size]) - return out, cur + size - - -def group_by_topic_and_partition(tuples): - out = collections.defaultdict(dict) - for t in tuples: - assert t.topic not in out or t.partition not in out[t.topic], \ - 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__, - t.topic, t.partition) - out[t.topic][t.partition] = t - return out - - -class ReentrantTimer(object): - """ - A timer that can be restarted, unlike threading.Timer - (although this uses threading.Timer) - - Arguments: - - t: timer interval in milliseconds - fn: a callable to invoke - args: tuple of args to be passed to function - kwargs: keyword arguments to be passed to function - """ - def __init__(self, t, fn, *args, **kwargs): - - if t <= 0: - raise ValueError('Invalid timeout value') - - if not callable(fn): - raise ValueError('fn must be callable') - - self.thread = None - self.t = t / 1000.0 - self.fn = fn - self.args = args - self.kwargs = kwargs - self.active = None - - def _timer(self, active): - # python2.6 Event.wait() always returns None - # python2.7 and greater returns the flag value (true/false) - # we want the flag value, so add an 'or' here for python2.6 - # this is redundant for later python versions (FLAG OR FLAG == FLAG) - while not (active.wait(self.t) or active.is_set()): - self.fn(*self.args, **self.kwargs) - - def start(self): - if self.thread is not None: - self.stop() - - self.active = Event() - self.thread = Thread(target=self._timer, args=(self.active,)) - self.thread.daemon = True # So the app exits when main thread exits - self.thread.start() - - def stop(self): - if self.thread is None: - return - - self.active.set() - self.thread.join(self.t + 1) - # noinspection PyAttributeOutsideInit - self.timer = None - self.fn = None - - def __del__(self): - self.stop() - - class WeakMethod(object): """ Callable that weakly references a method and the object it is bound to. It |