diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-05-31 14:57:57 +0530 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-06-04 09:16:29 -0400 |
commit | 6704050e50a2934806e349ac875f2ab31ccfcd9d (patch) | |
tree | c5ebdcb8bfde25c84078dd919bf1c6b619effe63 | |
parent | 58288d9f86a67772326cf946d0ff4729a764ad52 (diff) | |
download | kafka-python-6704050e50a2934806e349ac875f2ab31ccfcd9d.tar.gz |
Finish making remaining files pep8 ready
-rw-r--r-- | kafka/client.py | 169 | ||||
-rw-r--r-- | kafka/consumer.py | 93 |
2 files changed, 168 insertions, 94 deletions
diff --git a/kafka/client.py b/kafka/client.py index eb2c25c..5595d49 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -10,43 +10,47 @@ import zlib from kafka.common import * from kafka.conn import KafkaConnection -from kafka.protocol import KafkaProtocol +from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") + class KafkaClient(object): CLIENT_ID = "kafka-python" - ID_GEN = count() + ID_GEN = count() def __init__(self, host, port, bufsize=4096): - # We need one connection to bootstrap + # We need one connection to bootstrap self.bufsize = bufsize - self.conns = { # (host, port) -> KafkaConnection + self.conns = { # (host, port) -> KafkaConnection (host, port): KafkaConnection(host, port, bufsize) - } - self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] + } + self.brokers = {} # broker_id -> BrokerMetadata + self.topics_to_brokers = {} # topic_id -> broker_id + self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self._load_metadata_for_topics() ################## # Private API # ################## - def _get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) + self.conns[(broker.host, broker.port)] = \ + KafkaConnection(broker.host, broker.port, self.bufsize) + return self.conns[(broker.host, broker.port)] def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: self._load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: raise Exception("Partition does not exist: %s" % str(key)) + return self.topics_to_brokers[key] def _load_metadata_for_topics(self, *topics): @@ -55,13 +59,18 @@ class KafkaClient(object): recurse in the event of a retry. """ requestId = self._next_id() - request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) + request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, + requestId, topics) + response = self._send_broker_unaware_request(requestId, request) if response is None: raise Exception("All servers failed to process request") + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) + self.brokers.update(brokers) self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -77,7 +86,8 @@ class KafkaClient(object): time.sleep(1) self._load_metadata_for_topics(topic) else: - self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] self.topic_partitions[topic].append(partition) def _next_id(self): @@ -86,8 +96,8 @@ class KafkaClient(object): def _send_broker_unaware_request(self, requestId, request): """ - Attempt to send a broker-agnostic request to one of the available brokers. - Keep trying until you succeed. + Attempt to send a broker-agnostic request to one of the available + brokers. Keep trying until you succeed. """ for conn in self.conns.values(): try: @@ -95,32 +105,43 @@ class KafkaClient(object): response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s, " + "trying next server: %s" % (request, conn, e)) continue + return None def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ - Group a list of request payloads by topic+partition and send them to the - leader broker for that partition using the supplied encode/decode functions + Group a list of request payloads by topic+partition and send them to + the leader broker for that partition using the supplied encode/decode + functions Params ====== - payloads: list of object-like entities with a topic and partition attribute - encode_fn: a method to encode the list of payloads to a request body, must accept - client_id, correlation_id, and payloads as keyword arguments - decode_fn: a method to decode a response body into response objects. The response - objects must be object-like and have topic and partition attributes + payloads: list of object-like entities with a topic and + partition attribute + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes Return ====== List of response objects in the same order as the supplied payloads """ + # Group the requests by topic+partition original_keys = [] payloads_by_broker = defaultdict(list) + for payload in payloads: - payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload) + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + + payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary @@ -130,7 +151,8 @@ class KafkaClient(object): for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) requestId = self._next_id() - request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads) + request = encoder_fn(client_id=KafkaClient.CLIENT_ID, + correlation_id=requestId, payloads=payloads) # Send the request, recv the response conn.send(requestId, request) @@ -149,33 +171,43 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() - def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): + def send_produce_request(self, payloads=[], acks=1, timeout=1000, + fail_on_error=True, callback=None): """ Encode and send some ProduceRequests - ProduceRequests will be grouped by (topic, partition) and then sent to a specific - broker. Output is a list of responses in the same order as the list of payloads - specified + ProduceRequests will be grouped by (topic, partition) and then + sent to a specific broker. Output is a list of responses in the + same order as the list of payloads specified Params ====== payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we encounter an API error? - callback: function, instead of returning the ProduceResponse, first pass it through this function + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function Return ====== - list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout), - KafkaProtocol.decode_produce_response) + + encoder = partial(KafkaProtocol.encode_produce_request, + acks=acks, timeout=timeout) + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: # Check for errors - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("ProduceRequest for %s failed with " + "errorcode=%d" % ( + TopicAndPartition(resp.topic, resp.partition), + resp.error)) + # Run the callback if callback is not None: out.append(callback(resp)) @@ -183,22 +215,27 @@ class KafkaClient(object): out.append(resp) return out - def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): + def send_fetch_request(self, payloads=[], fail_on_error=True, + callback=None): """ Encode and send a FetchRequest - - Payloads are grouped by topic and partition so they can be pipelined to the same - brokers. + + Payloads are grouped by topic and partition so they can be pipelined + to the same brokers. """ resps = self._send_broker_aware_request(payloads, KafkaProtocol.encode_fetch_request, KafkaProtocol.decode_fetch_response) + out = [] for resp in resps: # Check for errors - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("FetchRequest for %s failed with " + "errorcode=%d" % ( + TopicAndPartition(resp.topic, resp.partition), + resp.error)) + # Run the callback if callback is not None: out.append(callback(resp)) @@ -206,43 +243,55 @@ class KafkaClient(object): out.append(resp) return out - - def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): + def send_offset_request(self, payloads=[], fail_on_error=True, + callback=None): resps = self._send_broker_aware_request(payloads, KafkaProtocol.encode_offset_request, KafkaProtocol.decode_offset_response) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetRequest failed with errorcode=%s", + resp.error) if callback is not None: out.append(callback(resp)) else: out.append(resp) return out - def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_offset_commit_request, group=group), - KafkaProtocol.decode_offset_commit_response) + def send_offset_commit_request(self, group, payloads=[], + fail_on_error=True, callback=None): + encoder = partial(KafkaProtocol.encode_offset_commit_request, + group=group) + decoder = KafkaProtocol.decode_offset_commit_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with " + "errorcode=%s", resp.error) + if callback is not None: out.append(callback(resp)) else: out.append(resp) return out - def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_offset_fetch_request, group=group), - KafkaProtocol.decode_offset_fetch_response) + def send_offset_fetch_request(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = partial(KafkaProtocol.encode_offset_fetch_request, + group=group) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", + resp.error) if callback is not None: out.append(callback(resp)) else: diff --git a/kafka/consumer.py b/kafka/consumer.py index d09803a..3b64571 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -16,6 +16,7 @@ log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 + class SimpleConsumer(object): """ A simple consumer implementation that consumes all partitions for a topic @@ -25,13 +26,16 @@ class SimpleConsumer(object): topic: the topic to consume auto_commit: default True. Whether or not to auto commit the offsets - auto_commit_every_n: default 100. How many messages to consume before a commit - auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit + auto_commit_every_n: default 100. How many messages to consume + before a commit + auto_commit_every_t: default 5000. How much time (in milliseconds) to + wait before commit Auto commit details: - If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another - when one is triggered. These triggers simply call the commit method on this class. A - manual call to commit will also reset these triggers + If both auto_commit_every_n and auto_commit_every_t are set, they will + reset one another when one is triggered. These triggers simply call the + commit method on this class. A manual call to commit will also reset + these triggers """ def __init__(self, client, group, topic, auto_commit=True, @@ -63,17 +67,19 @@ class SimpleConsumer(object): elif resp.error == ErrorMapping.UNKNOWN_TOPIC_OR_PARTITON: return 0 else: - raise Exception("OffsetFetchRequest for topic=%s, partition=%d failed with errorcode=%s" % ( - resp.topic, resp.partition, resp.error)) + raise Exception("OffsetFetchRequest for topic=%s, " + "partition=%d failed with errorcode=%s" % ( + resp.topic, resp.partition, resp.error)) # Uncomment for 0.8.1 # #for partition in self.client.topic_partitions[topic]: # req = OffsetFetchRequest(topic, partition) # (offset,) = self.client.send_offset_fetch_request(group, [req], - # callback=get_or_init_offset_callback, fail_on_error=False) + # callback=get_or_init_offset_callback, + # fail_on_error=False) # self.offsets[partition] = offset - + for partition in self.client.topic_partitions[topic]: self.offsets[partition] = 0 @@ -87,14 +93,16 @@ class SimpleConsumer(object): 1 is relative to the current offset 2 is relative to the latest known offset (tail) """ - if whence == 1: # relative to current position + if whence == 1: # relative to current position for partition, _offset in self.offsets.items(): self.offsets[partition] = _offset + offset - elif whence in (0, 2): # relative to beginning or end - # divide the request offset by number of partitions, distribute the remained evenly + elif whence in (0, 2): # relative to beginning or end + # divide the request offset by number of partitions, + # distribute the remained evenly (delta, rem) = divmod(offset, len(self.offsets)) deltas = {} - for partition, r in izip_longest(self.offsets.keys(), repeat(1, rem), fillvalue=0): + for partition, r in izip_longest(self.offsets.keys(), + repeat(1, rem), fillvalue=0): deltas[partition] = delta + r reqs = [] @@ -108,7 +116,8 @@ class SimpleConsumer(object): resps = self.client.send_offset_request(reqs) for resp in resps: - self.offsets[resp.partition] = resp.offsets[0] + deltas[resp.partition] + self.offsets[resp.partition] = resp.offsets[0] + \ + deltas[resp.partition] else: raise ValueError("Unexpected value for `whence`, %d" % whence) @@ -149,24 +158,24 @@ class SimpleConsumer(object): """ Commit offsets for this consumer - partitions: list of partitions to commit, default is to commit all of them + partitions: list of partitions to commit, default is to commit + all of them """ - # short circuit if nothing happened if self.count_since_commit == 0: return with self.commit_lock: reqs = [] - if len(partitions) == 0: # commit all partitions + if len(partitions) == 0: # commit all partitions partitions = self.offsets.keys() for partition in partitions: offset = self.offsets[partition] log.debug("Commit offset %d in SimpleConsumer: " "group=%s, topic=%s, partition=%s" % - (offset, self.group, self.topic, partition)) + (offset, self.group, self.topic, partition)) reqs.append(OffsetCommitRequest(self.topic, partition, offset, None)) @@ -177,10 +186,27 @@ class SimpleConsumer(object): self.count_since_commit = 0 + def _auto_commit(self): + """ + Check if we have to commit based on number of messages and commit + """ + + # Check if we are supposed to do an auto-commit + if not self.auto_commit or self.auto_commit_every_n is None: + return + + if self.count_since_commit > self.auto_commit_every_n: + if self.commit_timer is not None: + self.commit_timer.stop() + self.commit() + self.commit_timer.start() + else: + self.commit() + def __iter__(self): """ - Create an iterate per partition. Iterate through them calling next() until they are - all exhausted. + Create an iterate per partition. Iterate through them calling next() + until they are all exhausted. """ iters = {} for partition, offset in self.offsets.items(): @@ -199,31 +225,30 @@ class SimpleConsumer(object): except StopIteration: log.debug("Done iterating over partition %s" % partition) del iters[partition] - continue # skip auto-commit since we didn't yield anything - # auto commit logic + # skip auto-commit since we didn't yield anything + continue + + # Count, check and commit messages if necessary self.count_since_commit += 1 - if self.auto_commit is True: - if self.auto_commit_every_n is not None and self.count_since_commit > self.auto_commit_every_n: - if self.commit_timer is not None: - self.commit_timer.stop() - self.commit() - self.commit_timer.start() - else: - self.commit() + self._auto_commit() def __iter_partition__(self, partition, offset): """ - Iterate over the messages in a partition. Create a FetchRequest to get back - a batch of messages, yield them one at a time. After a batch is exhausted, - start a new batch unless we've reached the end of ths partition. + Iterate over the messages in a partition. Create a FetchRequest + to get back a batch of messages, yield them one at a time. + After a batch is exhausted, start a new batch unless we've reached + the end of this partition. """ while True: - req = FetchRequest(self.topic, partition, offset, 1024) # TODO configure fetch size + # TODO: configure fetch size + req = FetchRequest(self.topic, partition, offset, 1024) (resp,) = self.client.send_fetch_request([req]) + assert resp.topic == self.topic assert resp.partition == partition + next_offset = None for message in resp.messages: next_offset = message.offset |