diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-05-31 14:57:57 +0530 |
---|---|---|
committer | David Arthur <mumrah@gmail.com> | 2013-06-04 09:16:29 -0400 |
commit | 6704050e50a2934806e349ac875f2ab31ccfcd9d (patch) | |
tree | c5ebdcb8bfde25c84078dd919bf1c6b619effe63 /kafka/client.py | |
parent | 58288d9f86a67772326cf946d0ff4729a764ad52 (diff) | |
download | kafka-python-6704050e50a2934806e349ac875f2ab31ccfcd9d.tar.gz |
Finish making remaining files pep8 ready
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 169 |
1 files changed, 109 insertions, 60 deletions
diff --git a/kafka/client.py b/kafka/client.py index eb2c25c..5595d49 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -10,43 +10,47 @@ import zlib from kafka.common import * from kafka.conn import KafkaConnection -from kafka.protocol import KafkaProtocol +from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") + class KafkaClient(object): CLIENT_ID = "kafka-python" - ID_GEN = count() + ID_GEN = count() def __init__(self, host, port, bufsize=4096): - # We need one connection to bootstrap + # We need one connection to bootstrap self.bufsize = bufsize - self.conns = { # (host, port) -> KafkaConnection + self.conns = { # (host, port) -> KafkaConnection (host, port): KafkaConnection(host, port, bufsize) - } - self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] + } + self.brokers = {} # broker_id -> BrokerMetadata + self.topics_to_brokers = {} # topic_id -> broker_id + self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...] self._load_metadata_for_topics() ################## # Private API # ################## - def _get_conn_for_broker(self, broker): "Get or create a connection to a broker" if (broker.host, broker.port) not in self.conns: - self.conns[(broker.host, broker.port)] = KafkaConnection(broker.host, broker.port, self.bufsize) + self.conns[(broker.host, broker.port)] = \ + KafkaConnection(broker.host, broker.port, self.bufsize) + return self.conns[(broker.host, broker.port)] def _get_leader_for_partition(self, topic, partition): key = TopicAndPartition(topic, partition) if key not in self.topics_to_brokers: self._load_metadata_for_topics(topic) + if key not in self.topics_to_brokers: raise Exception("Partition does not exist: %s" % str(key)) + return self.topics_to_brokers[key] def _load_metadata_for_topics(self, *topics): @@ -55,13 +59,18 @@ class KafkaClient(object): recurse in the event of a retry. """ requestId = self._next_id() - request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, requestId, topics) + request = KafkaProtocol.encode_metadata_request(KafkaClient.CLIENT_ID, + requestId, topics) + response = self._send_broker_unaware_request(requestId, request) if response is None: raise Exception("All servers failed to process request") + (brokers, topics) = KafkaProtocol.decode_metadata_response(response) + log.debug("Broker metadata: %s", brokers) log.debug("Topic metadata: %s", topics) + self.brokers.update(brokers) self.topics_to_brokers = {} for topic, partitions in topics.items(): @@ -77,7 +86,8 @@ class KafkaClient(object): time.sleep(1) self._load_metadata_for_topics(topic) else: - self.topics_to_brokers[TopicAndPartition(topic, partition)] = brokers[meta.leader] + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] self.topic_partitions[topic].append(partition) def _next_id(self): @@ -86,8 +96,8 @@ class KafkaClient(object): def _send_broker_unaware_request(self, requestId, request): """ - Attempt to send a broker-agnostic request to one of the available brokers. - Keep trying until you succeed. + Attempt to send a broker-agnostic request to one of the available + brokers. Keep trying until you succeed. """ for conn in self.conns.values(): try: @@ -95,32 +105,43 @@ class KafkaClient(object): response = conn.recv(requestId) return response except Exception, e: - log.warning("Could not send request [%r] to server %s, trying next server: %s" % (request, conn, e)) + log.warning("Could not send request [%r] to server %s, " + "trying next server: %s" % (request, conn, e)) continue + return None def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn): """ - Group a list of request payloads by topic+partition and send them to the - leader broker for that partition using the supplied encode/decode functions + Group a list of request payloads by topic+partition and send them to + the leader broker for that partition using the supplied encode/decode + functions Params ====== - payloads: list of object-like entities with a topic and partition attribute - encode_fn: a method to encode the list of payloads to a request body, must accept - client_id, correlation_id, and payloads as keyword arguments - decode_fn: a method to decode a response body into response objects. The response - objects must be object-like and have topic and partition attributes + payloads: list of object-like entities with a topic and + partition attribute + encode_fn: a method to encode the list of payloads to a request body, + must accept client_id, correlation_id, and payloads as + keyword arguments + decode_fn: a method to decode a response body into response objects. + The response objects must be object-like and have topic + and partition attributes Return ====== List of response objects in the same order as the supplied payloads """ + # Group the requests by topic+partition original_keys = [] payloads_by_broker = defaultdict(list) + for payload in payloads: - payloads_by_broker[self._get_leader_for_partition(payload.topic, payload.partition)].append(payload) + leader = self._get_leader_for_partition(payload.topic, + payload.partition) + + payloads_by_broker[leader].append(payload) original_keys.append((payload.topic, payload.partition)) # Accumulate the responses in a dictionary @@ -130,7 +151,8 @@ class KafkaClient(object): for broker, payloads in payloads_by_broker.items(): conn = self._get_conn_for_broker(broker) requestId = self._next_id() - request = encoder_fn(client_id=KafkaClient.CLIENT_ID, correlation_id=requestId, payloads=payloads) + request = encoder_fn(client_id=KafkaClient.CLIENT_ID, + correlation_id=requestId, payloads=payloads) # Send the request, recv the response conn.send(requestId, request) @@ -149,33 +171,43 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() - def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): + def send_produce_request(self, payloads=[], acks=1, timeout=1000, + fail_on_error=True, callback=None): """ Encode and send some ProduceRequests - ProduceRequests will be grouped by (topic, partition) and then sent to a specific - broker. Output is a list of responses in the same order as the list of payloads - specified + ProduceRequests will be grouped by (topic, partition) and then + sent to a specific broker. Output is a list of responses in the + same order as the list of payloads specified Params ====== payloads: list of ProduceRequest - fail_on_error: boolean, should we raise an Exception if we encounter an API error? - callback: function, instead of returning the ProduceResponse, first pass it through this function + fail_on_error: boolean, should we raise an Exception if we + encounter an API error? + callback: function, instead of returning the ProduceResponse, + first pass it through this function Return ====== - list of ProduceResponse or callback(ProduceResponse), in the order of input payloads + list of ProduceResponse or callback(ProduceResponse), in the + order of input payloads """ - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout), - KafkaProtocol.decode_produce_response) + + encoder = partial(KafkaProtocol.encode_produce_request, + acks=acks, timeout=timeout) + decoder = KafkaProtocol.decode_produce_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: # Check for errors - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("ProduceRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("ProduceRequest for %s failed with " + "errorcode=%d" % ( + TopicAndPartition(resp.topic, resp.partition), + resp.error)) + # Run the callback if callback is not None: out.append(callback(resp)) @@ -183,22 +215,27 @@ class KafkaClient(object): out.append(resp) return out - def send_fetch_request(self, payloads=[], fail_on_error=True, callback=None): + def send_fetch_request(self, payloads=[], fail_on_error=True, + callback=None): """ Encode and send a FetchRequest - - Payloads are grouped by topic and partition so they can be pipelined to the same - brokers. + + Payloads are grouped by topic and partition so they can be pipelined + to the same brokers. """ resps = self._send_broker_aware_request(payloads, KafkaProtocol.encode_fetch_request, KafkaProtocol.decode_fetch_response) + out = [] for resp in resps: # Check for errors - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("FetchRequest for %s failed with errorcode=%d" % - (TopicAndPartition(resp.topic, resp.partition), resp.error)) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("FetchRequest for %s failed with " + "errorcode=%d" % ( + TopicAndPartition(resp.topic, resp.partition), + resp.error)) + # Run the callback if callback is not None: out.append(callback(resp)) @@ -206,43 +243,55 @@ class KafkaClient(object): out.append(resp) return out - - def send_offset_request(self, payloads=[], fail_on_error=True, callback=None): + def send_offset_request(self, payloads=[], fail_on_error=True, + callback=None): resps = self._send_broker_aware_request(payloads, KafkaProtocol.encode_offset_request, KafkaProtocol.decode_offset_response) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetRequest failed with errorcode=%s", + resp.error) if callback is not None: out.append(callback(resp)) else: out.append(resp) return out - def send_offset_commit_request(self, group, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_offset_commit_request, group=group), - KafkaProtocol.decode_offset_commit_response) + def send_offset_commit_request(self, group, payloads=[], + fail_on_error=True, callback=None): + encoder = partial(KafkaProtocol.encode_offset_commit_request, + group=group) + decoder = KafkaProtocol.decode_offset_commit_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with " + "errorcode=%s", resp.error) + if callback is not None: out.append(callback(resp)) else: out.append(resp) return out - def send_offset_fetch_request(self, group, payloads=[], fail_on_error=True, callback=None): - resps = self._send_broker_aware_request(payloads, - partial(KafkaProtocol.encode_offset_fetch_request, group=group), - KafkaProtocol.decode_offset_fetch_response) + def send_offset_fetch_request(self, group, payloads=[], + fail_on_error=True, callback=None): + + encoder = partial(KafkaProtocol.encode_offset_fetch_request, + group=group) + decoder = KafkaProtocol.decode_offset_fetch_response + resps = self._send_broker_aware_request(payloads, encoder, decoder) + out = [] for resp in resps: - if fail_on_error == True and resp.error != ErrorMapping.NO_ERROR: - raise Exception("OffsetCommitRequest failed with errorcode=%s", resp.error) + if fail_on_error is True and resp.error != ErrorMapping.NO_ERROR: + raise Exception("OffsetCommitRequest failed with errorcode=%s", + resp.error) if callback is not None: out.append(callback(resp)) else: |