From 36b67b79202f8bdeab60de1f4ed582328eeb04db Mon Sep 17 00:00:00 2001 From: David Arthur Date: Tue, 2 Oct 2012 12:09:19 -0400 Subject: Start work on packaging issue #3 --- kafka.py | 629 --------------------------------------------------------------- 1 file changed, 629 deletions(-) delete mode 100644 kafka.py (limited to 'kafka.py') diff --git a/kafka.py b/kafka.py deleted file mode 100644 index ff9f53d..0000000 --- a/kafka.py +++ /dev/null @@ -1,629 +0,0 @@ -from collections import namedtuple -from cStringIO import StringIO -import logging -import gzip -import select -import socket -import struct -import zlib - -log = logging.getLogger("org.apache.kafka") - - -error_codes = { - -1: "UnknownError", - 0: None, - 1: "OffsetOutOfRange", - 2: "InvalidMessage", - 3: "WrongPartition", - 4: "InvalidFetchSize" -} - -class KafkaException(Exception): - def __init__(self, errorType): - self.errorType = errorType - def __str__(self): - return str(errorType) - - -Message = namedtuple("Message", ["magic", "attributes", "crc", "payload"]) -FetchRequest = namedtuple("FetchRequest", ["topic", "partition", "offset", "size"]) -ProduceRequest = namedtuple("ProduceRequest", ["topic", "partition", "messages"]) -OffsetRequest = namedtuple("OffsetRequest", ["topic", "partition", "time", "maxOffsets"]) - -def gzip_encode(payload): - buf = StringIO() - f = gzip.GzipFile(fileobj=buf, mode='w', compresslevel=6) - f.write(payload) - f.close() - buf.seek(0) - out = buf.read() - buf.close() - return out - -def gzip_decode(payload): - buf = StringIO(payload) - f = gzip.GzipFile(fileobj=buf, mode='r') - out = f.read() - f.close() - buf.close() - return out - - -def length_prefix_message(msg): - """ - Prefix a message with it's length as an int - """ - return struct.pack('>i', len(msg)) + msg - -class KafkaClient(object): - """ - Request Structure - ================= - - ::= - ::= - ::= 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Response Structure - ================== - - ::= - ::= - ::= -1 | 0 | 1 | 2 | 3 | 4 - ::= | | | | - - Messages are big-endian byte order - """ - - PRODUCE_KEY = 0 - FETCH_KEY = 1 - MULTIFETCH_KEY = 2 - MULTIPRODUCE_KEY = 3 - OFFSET_KEY = 4 - - ATTRIBUTE_CODEC_MASK = 0x03 - - def __init__(self, host, port, bufsize=1024): - self.host = host - self.port = port - self.bufsize = bufsize - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((host, port)) - self._sock.settimeout(10) - log.debug("Connected to %s on %d", host, port) - - ###################### - # Protocol Stuff # - ###################### - - def _consume_response_iter(self): - """ - This method handles the response header and error messages. It - then returns an iterator for the chunks of the response - """ - log.debug("Handling response from Kafka") - # Header - resp = self._sock.recv(6) - if resp == "": - raise Exception("Got no response from Kafka") - (size, err) = struct.unpack('>iH', resp) - - log.debug("About to read %d bytes from Kafka", size-2) - # Handle error - error = error_codes.get(err) - if error is not None: - raise KafkaException(error) - - # Response iterator - total = 0 - while total < (size-2): - resp = self._sock.recv(self.bufsize) - log.debug("Read %d bytes from Kafka", len(resp)) - if resp == "": - raise Exception("Underflow") - total += len(resp) - yield resp - - def _consume_response(self): - """ - Fully consumer the response iterator - """ - data = "" - for chunk in self._consume_response_iter(): - data += chunk - return data - - @classmethod - def encode_message(cls, message): - """ - Encode a Message from a Message tuple - - Params - ====== - message: Message - - Wire Format - =========== - ::= | - ::= 0 - ::= 1 - ::= - ::= - ::= - ::= - ::= - ::= - - The crc is a crc32 checksum of the message payload. The attributes are bitmask - used for indicating the compression algorithm. - """ - if message.magic == 0: - msg = struct.pack('>Bi%ds' % len(message.payload), - message.magic, message.crc, message.payload) - elif message.magic == 1: - msg = struct.pack('>BBi%ds' % len(message.payload), - message.magic, message.attributes, message.crc, message.payload) - else: - raise Exception("Unexpected magic number: %d" % message.magic) - msg = length_prefix_message(msg) - log.debug("Encoded %s as %r" % (message, msg)) - return msg - - @classmethod - def encode_message_set(cls, messages): - """ - Encode a MessageSet - - One or more concatenated Messages - """ - message_set = "" - for message in messages: - encoded_message = cls.encode_message(message) - message_set += encoded_message - return message_set - - @classmethod - def encode_produce_request(cls, produceRequest): - """ - Encode a ProduceRequest - - Wire Format - =========== - ::= - ::= 0 - ::= - ::= - ::= - ::= - - The request-key (0) is encoded as a short (int16). len is the length of the proceeding MessageSet - """ - (topic, partition, messages) = produceRequest - message_set = cls.encode_message_set(messages) - log.debug("Sending MessageSet: %r" % message_set) - req = struct.pack('>HH%dsii%ds' % (len(topic), len(message_set)), - KafkaClient.PRODUCE_KEY, len(topic), topic, partition, len(message_set), message_set) - return req - - @classmethod - def encode_multi_produce_request(cls, produceRequests): - """ - Encode a MultiProducerRequest - - Params - ====== - produceRequest: list of ProduceRequest objects - - Returns - ======= - Encoded request - - Wire Format - =========== - ::= - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - - num is the number of ProduceRequests being encoded - """ - req = struct.pack('>HH', KafkaClient.MULTIPRODUCE_KEY, len(produceRequests)) - for (topic, partition, messages) in produceRequests: - message_set = cls.encode_message_set(messages) - req += struct.pack('>H%dsii%ds' % (len(topic), len(message_set)), - len(topic), topic, partition, len(message_set), message_set) - return req - - @classmethod - def encode_fetch_request(cls, fetchRequest): - """ - Encode a FetchRequest message - - Wire Format - =========== - ::= - ::= 1 - ::= - ::= - ::= - ::= - ::= - - The request-key (1) is encoded as a short (int16). - """ - (topic, partition, offset, size) = fetchRequest - req = struct.pack('>HH%dsiqi' % len(topic), - KafkaClient.FETCH_KEY, len(topic), topic, partition, offset, size) - return req - - @classmethod - def encode_multi_fetch_request(cls, fetchRequests): - """ - Encode the MultiFetchRequest message from a list of FetchRequest objects - - Params - ====== - fetchRequests: list of FetchRequest - - Returns - ======= - req: bytes, The message to send to Kafka - - Wire Format - =========== - ::= [ ] - ::= 2 - ::= - ::= [ ] - ::= - ::= - ::= - ::= - ::= - ::= - - The request-key (2) is encoded as a short (int16). - """ - req = struct.pack('>HH', KafkaClient.MULTIFETCH_KEY, len(fetchRequests)) - for (topic, partition, offset, size) in fetchRequests: - req += struct.pack('>H%dsiqi' % len(topic), len(topic), topic, partition, offset, size) - return req - - @classmethod - def encode_offset_request(cls, offsetRequest): - """ - Encode an OffsetRequest message - - Wire Format - =========== - ::=