From 36b67b79202f8bdeab60de1f4ed582328eeb04db Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Oct 2012 12:09:19 -0400 Subject: Start work on packaging issue #3 --- kafka/__init__.py | 1 + kafka/kafka.py | 629 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 630 insertions(+) create mode 100644 kafka/__init__.py create mode 100644 kafka/kafka.py (limited to 'kafka') diff --git a/kafka/__init__.py b/kafka/__init__.py new file mode 100644 index 0000000..69bc75f --- /dev/null +++ b/kafka/__init__.py @@ -0,0 +1 @@ +__all__ = ["kafka"] diff --git a/kafka/kafka.py b/kafka/kafka.py new file mode 100644 index 0000000..ff9f53d --- /dev/null +++ b/kafka/kafka.py @@ -0,0 +1,629 @@ +from collections import namedtuple +from cStringIO import StringIO +import logging +import gzip +import select +import socket +import struct +import zlib + +log = logging.getLogger("org.apache.kafka") + + +error_codes = { + -1: "UnknownError", + 0: None, + 1: "OffsetOutOfRange", + 2: "InvalidMessage", + 3: "WrongPartition", + 4: "InvalidFetchSize" +} + +class KafkaException(Exception): + def __init__(self, errorType): + self.errorType = errorType + def __str__(self): + return str(errorType) + + +Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) +FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) +ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) +OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) + +def gzip_encode(payload): + buf = StringIO() + f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) + f.write(payload) + f.close() + buf.seek(0) + out = buf.read() + buf.close() + return out + +def gzip_decode(payload): + buf = StringIO(payload) + f = gzip.GzipFile(fileobj=buf, mode='r') + out = f.read() + f.close() + buf.close() + return out + + +def length_prefix_message(msg): + """ + Prefix a message with it's length as an int + """ + return struct.pack('>i', len(msg)) + msg + +class KafkaClient(object): + """ + Request Structure + ================= + + ::= + ::= + ::= 0 | 1 | 2 | 3 | 4 + ::= | | | | + + Response Structure + ================== + + ::= + ::= + ::= -1 | 0 | 1 | 2 | 3 | 4 + ::= | | | | + + Messages are big-endian byte order + """ + + PRODUCE_KEY = 0 + FETCH_KEY = 1 + MULTIFETCH_KEY = 2 + MULTIPRODUCE_KEY = 3 + OFFSET_KEY = 4 + + ATTRIBUTE_CODEC_MASK = 0x03 + + def __init__(self, host, port, bufsize=1024): + self.host = host + self.port = port + self.bufsize = bufsize + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.connect((host, port)) + self._sock.settimeout(10) + log.debug("Connected to %s on %d", host, port) + + ###################### + # Protocol Stuff # + ###################### + + def _consume_response_iter(self): + """ + This method handles the response header and error messages. It + then returns an iterator for the chunks of the response + """ + log.debug("Handling response from Kafka") + # Header + resp = self._sock.recv(6) + if resp == "": + raise Exception("Got no response from Kafka") + (size, err) = struct.unpack('>iH', resp) + + log.debug("About to read %d bytes from Kafka", size-2) + # Handle error + error = error_codes.get(err) + if error is not None: + raise KafkaException(error) + + # Response iterator + total = 0 + while total < (size-2): + resp = self._sock.recv(self.bufsize) + log.debug("Read %d bytes from Kafka", len(resp)) + if resp == "": + raise Exception("Underflow") + total += len(resp) + yield resp + + def _consume_response(self): + """ + Fully consumer the response iterator + """ + data = "" + for chunk in self._consume_response_iter(): + data += chunk + return data + + @classmethod + def encode_message(cls, message): + """ + Encode a Message from a Message tuple + + Params + ====== + message: Message + + Wire Format + =========== + ::= | + ::= 0 + ::= 1 + ::= + ::= + ::= + ::= + ::= + ::= + + The crc is a crc32 checksum of the message payload. The attributes are bitmask + used for indicating the compression algorithm. + """ + if message.magic == 0: + msg = struct.pack('>Bi%ds' % len(message.payload), + message.magic, message.crc, message.payload) + elif message.magic == 1: + msg = struct.pack('>BBi%ds' % len(message.payload), + message.magic, message.attributes, message.crc, message.payload) + else: + raise Exception("Unexpected magic number: %d" % message.magic) + msg = length_prefix_message(msg) + log.debug("Encoded %s as %r" % (message, msg)) + return msg + + @classmethod + def encode_message_set(cls, messages): + """ + Encode a MessageSet + + One or more concatenated Messages + """ + message_set = "" + for message in messages: + encoded_message = cls.encode_message(message) + message_set += encoded_message + return message_set + + @classmethod + def encode_produce_request(cls, produceRequest): + """ + Encode a ProduceRequest + + Wire Format + =========== + ::= + ::= 0 + ::= + ::= + ::= + ::= + + The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet + """ + (topic, partition, messages) = produceRequest + message_set = cls.encode_message_set(messages) + log.debug("Sending MessageSet: %r" % message_set) + req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), + KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) + return req + + @classmethod + def encode_multi_produce_request(cls, produceRequests): + """ + Encode a MultiProducerRequest + + Params + ====== + produceRequest: list of ProduceRequest objects + + Returns + ======= + Encoded request + + Wire Format + =========== + ::= + ::= + ::= [ ] + ::= + ::= + ::= + ::= + ::= + + num is the number of ProduceRequests being encoded + """ + req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) + for (topic, partition, messages) in produceRequests: + message_set = cls.encode_message_set(messages) + req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), + len(topic), topic, partition, len(message_set), message_set) + return req + + @classmethod + def encode_fetch_request(cls, fetchRequest): + """ + Encode a FetchRequest message + + Wire Format + =========== + ::= + ::= 1 + ::= + ::= + ::= + ::= + ::= + + The request-key (1) is encoded as a short (int16). + """ + (topic, partition, offset, size) = fetchRequest + req = struct.pack('>HH%dsiqi' % len(topic), + KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) + return req + + @classmethod + def encode_multi_fetch_request(cls, fetchRequests): + """ + Encode the MultiFetchRequest message from a list of FetchRequest objects + + Params + ====== + fetchRequests: list of FetchRequest + + Returns + ======= + req: bytes, The message to send to Kafka + + Wire Format + =========== + ::= [ ] + ::= 2 + ::= + ::= [ ] + ::= + ::= + ::= + ::= + ::= + ::= + + The request-key (2) is encoded as a short (int16). + """ + req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) + for (topic, partition, offset, size) in fetchRequests: + req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) + return req + + @classmethod + def encode_offset_request(cls, offsetRequest): + """ + Encode an OffsetRequest message + + Wire Format + =========== + ::=