diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-07 18:51:14 -0800 |
commit | 828377377da43749af0d27ee256ef31bf714cf17 (patch) | |
tree | fbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/protocol | |
parent | 71e7568fcb8132899f366b37c32645fd5a40dc4b (diff) | |
parent | 9a8af1499ca425366d934487469d9977fae7fe5f (diff) | |
download | kafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz |
Merge branch '0.9'
Conflicts:
kafka/codec.py
kafka/version.py
test/test_producer.py
test/test_producer_integration.py
Diffstat (limited to 'kafka/protocol')
-rw-r--r-- | kafka/protocol/__init__.py | 6 | ||||
-rw-r--r-- | kafka/protocol/abstract.py | 17 | ||||
-rw-r--r-- | kafka/protocol/admin.py | 44 | ||||
-rw-r--r-- | kafka/protocol/api.py | 16 | ||||
-rw-r--r-- | kafka/protocol/commit.py | 119 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 32 | ||||
-rw-r--r-- | kafka/protocol/group.py | 103 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 440 | ||||
-rw-r--r-- | kafka/protocol/message.py | 144 | ||||
-rw-r--r-- | kafka/protocol/metadata.py | 29 | ||||
-rw-r--r-- | kafka/protocol/offset.py | 36 | ||||
-rw-r--r-- | kafka/protocol/pickle.py | 29 | ||||
-rw-r--r-- | kafka/protocol/produce.py | 29 | ||||
-rw-r--r-- | kafka/protocol/struct.py | 64 | ||||
-rw-r--r-- | kafka/protocol/types.py | 141 |
15 files changed, 1249 insertions, 0 deletions
diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py new file mode 100644 index 0000000..7b2a2f3 --- /dev/null +++ b/kafka/protocol/__init__.py @@ -0,0 +1,6 @@ +from .legacy import ( + create_message, create_gzip_message, + create_snappy_message, create_message_set, + CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS, + ATTRIBUTE_CODEC_MASK, KafkaProtocol, +) diff --git a/kafka/protocol/abstract.py b/kafka/protocol/abstract.py new file mode 100644 index 0000000..160678f --- /dev/null +++ b/kafka/protocol/abstract.py @@ -0,0 +1,17 @@ +import abc + + +class AbstractType(object): + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def encode(cls, value): # pylint: disable=no-self-argument + pass + + @abc.abstractmethod + def decode(cls, data): # pylint: disable=no-self-argument + pass + + @classmethod + def repr(cls, value): + return repr(value) diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py new file mode 100644 index 0000000..56dd042 --- /dev/null +++ b/kafka/protocol/admin.py @@ -0,0 +1,44 @@ +from .struct import Struct +from .types import Array, Bytes, Int16, Schema, String + + +class ListGroupsResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('groups', Array( + ('group', String('utf-8')), + ('protocol_type', String('utf-8')))) + ) + + +class ListGroupsRequest(Struct): + API_KEY = 16 + API_VERSION = 0 + RESPONSE_TYPE = ListGroupsResponse + SCHEMA = Schema() + + +class DescribeGroupsResponse(Struct): + SCHEMA = Schema( + ('groups', Array( + ('error_code', Int16), + ('group', String('utf-8')), + ('state', String('utf-8')), + ('protocol_type', String('utf-8')), + ('protocol', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('client_id', String('utf-8')), + ('client_host', String('utf-8')), + ('member_metadata', Bytes), + ('member_assignment', Bytes))))) + ) + + +class DescribeGroupsRequest(Struct): + API_KEY = 15 + API_VERSION = 0 + RESPONSE_TYPE = DescribeGroupsResponse + SCHEMA = Schema( + ('groups', Array(String('utf-8'))) + ) diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py new file mode 100644 index 0000000..0c23437 --- /dev/null +++ b/kafka/protocol/api.py @@ -0,0 +1,16 @@ +from .struct import Struct +from .types import Int16, Int32, String, Schema + + +class RequestHeader(Struct): + SCHEMA = Schema( + ('api_key', Int16), + ('api_version', Int16), + ('correlation_id', Int32), + ('client_id', String('utf-8')) + ) + + def __init__(self, request, correlation_id=0, client_id='kafka-python'): + super(RequestHeader, self).__init__( + request.API_KEY, request.API_VERSION, correlation_id, client_id + ) diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py new file mode 100644 index 0000000..a32f8d3 --- /dev/null +++ b/kafka/protocol/commit.py @@ -0,0 +1,119 @@ +from .struct import Struct +from .types import Array, Int16, Int32, Int64, Schema, String + + +class OffsetCommitResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16))))) + ) + + +class OffsetCommitRequest_v2(Struct): + API_KEY = 8 + API_VERSION = 2 # added retention_time, dropped timestamp + RESPONSE_TYPE = OffsetCommitResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('retention_time', Int64), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + DEFAULT_GENERATION_ID = -1 + DEFAULT_RETENTION_TIME = -1 + + +class OffsetCommitRequest_v1(Struct): + API_KEY = 8 + API_VERSION = 1 # Kafka-backed storage + RESPONSE_TYPE = OffsetCommitResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('consumer_group_generation_id', Int32), + ('consumer_id', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('timestamp', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetCommitRequest_v0(Struct): + API_KEY = 8 + API_VERSION = 0 # Zookeeper-backed storage + RESPONSE_TYPE = OffsetCommitResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')))))) + ) + + +class OffsetFetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('metadata', String('utf-8')), + ('error_code', Int16))))) + ) + + +class OffsetFetchRequest_v1(Struct): + API_KEY = 9 + API_VERSION = 1 # kafka-backed storage + RESPONSE_TYPE = OffsetFetchResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class OffsetFetchRequest_v0(Struct): + API_KEY = 9 + API_VERSION = 0 # zookeeper-backed storage + RESPONSE_TYPE = OffsetFetchResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))) + ) + + +class GroupCoordinatorResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('coordinator_id', Int32), + ('host', String('utf-8')), + ('port', Int32) + ) + + +class GroupCoordinatorRequest(Struct): + API_KEY = 10 + API_VERSION = 0 + RESPONSE_TYPE = GroupCoordinatorResponse + SCHEMA = Schema( + ('consumer_group', String('utf-8')) + ) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py new file mode 100644 index 0000000..e00c9ab --- /dev/null +++ b/kafka/protocol/fetch.py @@ -0,0 +1,32 @@ +from .message import MessageSet +from .struct import Struct +from .types import Array, Int16, Int32, Int64, Schema, String + + +class FetchResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('message_set', MessageSet))))) + ) + + +class FetchRequest(Struct): + API_KEY = 1 + API_VERSION = 0 + RESPONSE_TYPE = FetchResponse + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('offset', Int64), + ('max_bytes', Int32))))) + ) diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py new file mode 100644 index 0000000..72de005 --- /dev/null +++ b/kafka/protocol/group.py @@ -0,0 +1,103 @@ +from .struct import Struct +from .types import Array, Bytes, Int16, Int32, Schema, String + + +class JoinGroupResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('generation_id', Int32), + ('group_protocol', String('utf-8')), + ('leader_id', String('utf-8')), + ('member_id', String('utf-8')), + ('members', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + +class JoinGroupRequest(Struct): + API_KEY = 11 + API_VERSION = 0 + RESPONSE_TYPE = JoinGroupResponse + SCHEMA = Schema( + ('group', String('utf-8')), + ('session_timeout', Int32), + ('member_id', String('utf-8')), + ('protocol_type', String('utf-8')), + ('group_protocols', Array( + ('protocol_name', String('utf-8')), + ('protocol_metadata', Bytes))) + ) + UNKNOWN_MEMBER_ID = '' + + +class ProtocolMetadata(Struct): + SCHEMA = Schema( + ('version', Int16), + ('subscription', Array(String('utf-8'))), # topics list + ('user_data', Bytes) + ) + + +class SyncGroupResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16), + ('member_assignment', Bytes) + ) + + +class SyncGroupRequest(Struct): + API_KEY = 14 + API_VERSION = 0 + RESPONSE_TYPE = SyncGroupResponse + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')), + ('group_assignment', Array( + ('member_id', String('utf-8')), + ('member_metadata', Bytes))) + ) + + +class MemberAssignment(Struct): + SCHEMA = Schema( + ('version', Int16), + ('partition_assignment', Array( + ('topic', String('utf-8')), + ('partitions', Array(Int32)))), + ('user_data', Bytes) + ) + + +class HeartbeatResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16) + ) + + +class HeartbeatRequest(Struct): + API_KEY = 12 + API_VERSION = 0 + RESPONSE_TYPE = HeartbeatResponse + SCHEMA = Schema( + ('group', String('utf-8')), + ('generation_id', Int32), + ('member_id', String('utf-8')) + ) + + +class LeaveGroupResponse(Struct): + SCHEMA = Schema( + ('error_code', Int16) + ) + + +class LeaveGroupRequest(Struct): + API_KEY = 13 + API_VERSION = 0 + RESPONSE_TYPE = LeaveGroupResponse + SCHEMA = Schema( + ('group', String('utf-8')), + ('member_id', String('utf-8')) + ) diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py new file mode 100644 index 0000000..1835521 --- /dev/null +++ b/kafka/protocol/legacy.py @@ -0,0 +1,440 @@ +from __future__ import absolute_import + +import logging +import struct + +import six + +from six.moves import xrange + +import kafka.common +import kafka.protocol.commit +import kafka.protocol.fetch +import kafka.protocol.message +import kafka.protocol.metadata +import kafka.protocol.offset +import kafka.protocol.produce + +from kafka.codec import ( + gzip_encode, gzip_decode, snappy_encode, snappy_decode +) +from kafka.common import ( + ProtocolError, ChecksumError, + UnsupportedCodecError, + ConsumerMetadataResponse +) +from kafka.util import ( + crc32, read_short_string, read_int_string, relative_unpack, + write_short_string, write_int_string, group_by_topic_and_partition +) + + +log = logging.getLogger(__name__) + +ATTRIBUTE_CODEC_MASK = 0x03 +CODEC_NONE = 0x00 +CODEC_GZIP = 0x01 +CODEC_SNAPPY = 0x02 +ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY) + + +class KafkaProtocol(object): + """ + Class to encapsulate all of the protocol encoding/decoding. + This class does not have any state associated with it, it is purely + for organization. + """ + PRODUCE_KEY = 0 + FETCH_KEY = 1 + OFFSET_KEY = 2 + METADATA_KEY = 3 + OFFSET_COMMIT_KEY = 8 + OFFSET_FETCH_KEY = 9 + CONSUMER_METADATA_KEY = 10 + + ################### + # Private API # + ################### + + @classmethod + def _encode_message_header(cls, client_id, correlation_id, request_key, + version=0): + """ + Encode the common request envelope + """ + return struct.pack('>hhih%ds' % len(client_id), + request_key, # ApiKey + version, # ApiVersion + correlation_id, # CorrelationId + len(client_id), # ClientId size + client_id) # ClientId + + @classmethod + def _encode_message_set(cls, messages): + """ + Encode a MessageSet. Unlike other arrays in the protocol, + MessageSets are not length-prefixed + + Format + ====== + MessageSet => [Offset MessageSize Message] + Offset => int64 + MessageSize => int32 + """ + message_set = [] + for message in messages: + encoded_message = KafkaProtocol._encode_message(message) + message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0, + len(encoded_message), + encoded_message)) + return b''.join(message_set) + + @classmethod + def _encode_message(cls, message): + """ + Encode a single message. + + The magic number of a message is a format version number. + The only supported magic number right now is zero + + Format + ====== + Message => Crc MagicByte Attributes Key Value + Crc => int32 + MagicByte => int8 + Attributes => int8 + Key => bytes + Value => bytes + """ + if message.magic == 0: + msg = b''.join([ + struct.pack('>BB', message.magic, message.attributes), + write_int_string(message.key), + write_int_string(message.value) + ]) + crc = crc32(msg) + msg = struct.pack('>i%ds' % len(msg), crc, msg) + else: + raise ProtocolError("Unexpected magic number: %d" % message.magic) + return msg + + ################## + # Public API # + ################## + + @classmethod + def encode_produce_request(cls, payloads=(), acks=1, timeout=1000): + """ + Encode a ProduceRequest struct + + Arguments: + payloads: list of ProduceRequestPayload + acks: How "acky" you want the request to be + 1: written to disk by the leader + 0: immediate response + -1: waits for all replicas to be in sync + timeout: Maximum time (in ms) the server will wait for replica acks. + This is _not_ a socket timeout + + Returns: ProduceRequest + """ + if acks not in (1, 0, -1): + raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks) + + return kafka.protocol.produce.ProduceRequest( + required_acks=acks, + timeout=timeout, + topics=[( + topic, + [( + partition, + [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key, + magic=msg.magic, + attributes=msg.attributes)) + for msg in payload.messages]) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) + + @classmethod + def decode_produce_response(cls, response): + """ + Decode ProduceResponse to ProduceResponsePayload + + Arguments: + response: ProduceResponse + + Return: list of ProduceResponsePayload + """ + return [ + kafka.common.ProduceResponsePayload(topic, partition, error, offset) + for topic, partitions in response.topics + for partition, error, offset in partitions + ] + + @classmethod + def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096): + """ + Encodes a FetchRequest struct + + Arguments: + payloads: list of FetchRequestPayload + max_wait_time (int, optional): ms to block waiting for min_bytes + data. Defaults to 100. + min_bytes (int, optional): minimum bytes required to return before + max_wait_time. Defaults to 4096. + + Return: FetchRequest + """ + return kafka.protocol.fetch.FetchRequest( + replica_id=-1, + max_wait_time=max_wait_time, + min_bytes=min_bytes, + topics=[( + topic, + [( + partition, + payload.offset, + payload.max_bytes) + for partition, payload in topic_payloads.items()]) + for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) + + @classmethod + def decode_fetch_response(cls, response): + """ + Decode FetchResponse struct to FetchResponsePayloads + + Arguments: + response: FetchResponse + """ + return [ + kafka.common.FetchResponsePayload( + topic, partition, error, highwater_offset, [ + kafka.common.OffsetAndMessage(offset, message) + for offset, _, message in messages]) + for topic, partitions in response.topics + for partition, error, highwater_offset, messages in partitions + ] + + @classmethod + def encode_offset_request(cls, payloads=()): + return kafka.protocol.offset.OffsetRequest( + replica_id=-1, + topics=[( + topic, + [( + partition, + payload.time, + payload.max_offsets) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_offset_response(cls, response): + """ + Decode OffsetResponse into OffsetResponsePayloads + + Arguments: + response: OffsetResponse + + Returns: list of OffsetResponsePayloads + """ + return [ + kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets)) + for topic, partitions in response.topics + for partition, error, offsets in partitions + ] + + @classmethod + def encode_metadata_request(cls, topics=(), payloads=None): + """ + Encode a MetadataRequest + + Arguments: + topics: list of strings + """ + if payloads is not None: + topics = payloads + + return kafka.protocol.metadata.MetadataRequest(topics) + + @classmethod + def decode_metadata_response(cls, response): + return response + + @classmethod + def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads): + """ + Encode a ConsumerMetadataRequest + + Arguments: + client_id: string + correlation_id: int + payloads: string (consumer group) + """ + message = [] + message.append(cls._encode_message_header(client_id, correlation_id, + KafkaProtocol.CONSUMER_METADATA_KEY)) + message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads)) + + msg = b''.join(message) + return write_int_string(msg) + + @classmethod + def decode_consumer_metadata_response(cls, data): + """ + Decode bytes to a ConsumerMetadataResponse + + Arguments: + data: bytes to decode + """ + ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0) + (host, cur) = read_short_string(data, cur) + ((port,), cur) = relative_unpack('>i', data, cur) + + return ConsumerMetadataResponse(error, nodeId, host, port) + + @classmethod + def encode_offset_commit_request(cls, group, payloads): + """ + Encode an OffsetCommitRequest struct + + Arguments: + group: string, the consumer group you are committing offsets for + payloads: list of OffsetCommitRequestPayload + """ + return kafka.protocol.commit.OffsetCommitRequest_v0( + consumer_group=group, + topics=[( + topic, + [( + partition, + payload.offset, + payload.metadata) + for partition, payload in six.iteritems(topic_payloads)]) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + + @classmethod + def decode_offset_commit_response(cls, response): + """ + Decode OffsetCommitResponse to an OffsetCommitResponsePayload + + Arguments: + response: OffsetCommitResponse + """ + return [ + kafka.common.OffsetCommitResponsePayload(topic, partition, error) + for topic, partitions in response.topics + for partition, error in partitions + ] + + @classmethod + def encode_offset_fetch_request(cls, group, payloads, from_kafka=False): + """ + Encode an OffsetFetchRequest struct. The request is encoded using + version 0 if from_kafka is false, indicating a request for Zookeeper + offsets. It is encoded using version 1 otherwise, indicating a request + for Kafka offsets. + + Arguments: + group: string, the consumer group you are fetching offsets for + payloads: list of OffsetFetchRequestPayload + from_kafka: bool, default False, set True for Kafka-committed offsets + """ + if from_kafka: + request_class = kafka.protocol.commit.OffsetFetchRequest_v1 + else: + request_class = kafka.protocol.commit.OffsetFetchRequest_v0 + + return request_class( + consumer_group=group, + topics=[( + topic, + list(topic_payloads.keys())) + for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) + + @classmethod + def decode_offset_fetch_response(cls, response): + """ + Decode OffsetFetchResponse to OffsetFetchResponsePayloads + + Arguments: + response: OffsetFetchResponse + """ + return [ + kafka.common.OffsetFetchResponsePayload( + topic, partition, offset, metadata, error + ) + for topic, partitions in response.topics + for partition, offset, metadata, error in partitions + ] + + +def create_message(payload, key=None): + """ + Construct a Message + + Arguments: + payload: bytes, the payload to send to Kafka + key: bytes, a key used for partition routing (optional) + + """ + return kafka.common.Message(0, 0, key, payload) + + +def create_gzip_message(payloads, key=None, compresslevel=None): + """ + Construct a Gzipped Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + gzipped = gzip_encode(message_set, compresslevel=compresslevel) + codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP + + return kafka.common.Message(0, 0x00 | codec, key, gzipped) + + +def create_snappy_message(payloads, key=None): + """ + Construct a Snappy Message containing multiple Messages + + The given payloads will be encoded, compressed, and sent as a single atomic + message to Kafka. + + Arguments: + payloads: list(bytes), a list of payload to send be sent to Kafka + key: bytes, a key used for partition routing (optional) + + """ + message_set = KafkaProtocol._encode_message_set( + [create_message(payload, pl_key) for payload, pl_key in payloads]) + + snapped = snappy_encode(message_set) + codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY + + return kafka.common.Message(0, 0x00 | codec, key, snapped) + + +def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None): + """Create a message set using the given codec. + + If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise, + return a list containing a single codec-encoded message. + """ + if codec == CODEC_NONE: + return [create_message(m, k) for m, k in messages] + elif codec == CODEC_GZIP: + return [create_gzip_message(messages, key, compresslevel)] + elif codec == CODEC_SNAPPY: + return [create_snappy_message(messages, key)] + else: + raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py new file mode 100644 index 0000000..2648e24 --- /dev/null +++ b/kafka/protocol/message.py @@ -0,0 +1,144 @@ +import io + +from ..codec import gzip_decode, snappy_decode +from . import pickle +from .struct import Struct +from .types import ( + Int8, Int32, Int64, Bytes, Schema, AbstractType +) +from ..util import crc32 + + +class Message(Struct): + SCHEMA = Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes) + ) + CODEC_MASK = 0x03 + CODEC_GZIP = 0x01 + CODEC_SNAPPY = 0x02 + + def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + assert value is None or isinstance(value, bytes), 'value must be bytes' + assert key is None or isinstance(key, bytes), 'key must be bytes' + self.crc = crc + self.magic = magic + self.attributes = attributes + self.key = key + self.value = value + self.encode = self._encode_self + + def _encode_self(self, recalc_crc=True): + message = Message.SCHEMA.encode( + (self.crc, self.magic, self.attributes, self.key, self.value) + ) + if not recalc_crc: + return message + self.crc = crc32(message[4:]) + return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + + @classmethod + def decode(cls, data): + if isinstance(data, bytes): + data = io.BytesIO(data) + fields = [field.decode(data) for field in cls.SCHEMA.fields] + return cls(fields[4], key=fields[3], + magic=fields[1], attributes=fields[2], crc=fields[0]) + + def validate_crc(self): + raw_msg = self._encode_self(recalc_crc=False) + crc = crc32(raw_msg[4:]) + if crc == self.crc: + return True + return False + + def is_compressed(self): + return self.attributes & self.CODEC_MASK != 0 + + def decompress(self): + codec = self.attributes & self.CODEC_MASK + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY) + if codec == self.CODEC_GZIP: + raw_bytes = gzip_decode(self.value) + else: + raw_bytes = snappy_decode(self.value) + + return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes)) + + def __hash__(self): + return hash(self._encode_self(recalc_crc=False)) + + +class PartialMessage(bytes): + def __repr__(self): + return 'PartialMessage(%s)' % self + + +class MessageSet(AbstractType): + ITEM = Schema( + ('offset', Int64), + ('message_size', Int32), + ('message', Message.SCHEMA) + ) + + @classmethod + def encode(cls, items, size=True, recalc_message_size=True): + encoded_values = [] + for (offset, message_size, message) in items: + if isinstance(message, Message): + encoded_message = message.encode() + else: + encoded_message = cls.ITEM.fields[2].encode(message) + if recalc_message_size: + message_size = len(encoded_message) + encoded_values.append(cls.ITEM.fields[0].encode(offset)) + encoded_values.append(cls.ITEM.fields[1].encode(message_size)) + encoded_values.append(encoded_message) + encoded = b''.join(encoded_values) + if not size: + return encoded + return Int32.encode(len(encoded)) + encoded + + @classmethod + def decode(cls, data, bytes_to_read=None): + """Compressed messages should pass in bytes_to_read (via message size) + otherwise, we decode from data as Int32 + """ + if isinstance(data, bytes): + data = io.BytesIO(data) + if bytes_to_read is None: + bytes_to_read = Int32.decode(data) + items = [] + + # We need at least 8 + 4 + 14 bytes to read offset + message size + message + # (14 bytes is a message w/ null key and null value) + while bytes_to_read >= 26: + offset = Int64.decode(data) + bytes_to_read -= 8 + + message_size = Int32.decode(data) + bytes_to_read -= 4 + + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + if message_size > bytes_to_read: + break + + message = Message.decode(data) + bytes_to_read -= message_size + + items.append((offset, message_size, message)) + + # If any bytes are left over, clear them from the buffer + # and append a PartialMessage to signal that max_bytes may be too small + if bytes_to_read: + items.append((None, None, PartialMessage(data.read(bytes_to_read)))) + + return items + + @classmethod + def repr(cls, messages): + return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']' diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py new file mode 100644 index 0000000..810f1b8 --- /dev/null +++ b/kafka/protocol/metadata.py @@ -0,0 +1,29 @@ +from .struct import Struct +from .types import Array, Int16, Int32, Schema, String + + +class MetadataResponse(Struct): + SCHEMA = Schema( + ('brokers', Array( + ('node_id', Int32), + ('host', String('utf-8')), + ('port', Int32))), + ('topics', Array( + ('error_code', Int16), + ('topic', String('utf-8')), + ('partitions', Array( + ('error_code', Int16), + ('partition', Int32), + ('leader', Int32), + ('replicas', Array(Int32)), + ('isr', Array(Int32)))))) + ) + + +class MetadataRequest(Struct): + API_KEY = 3 + API_VERSION = 0 + RESPONSE_TYPE = MetadataResponse + SCHEMA = Schema( + ('topics', Array(String('utf-8'))) + ) diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py new file mode 100644 index 0000000..606f1f1 --- /dev/null +++ b/kafka/protocol/offset.py @@ -0,0 +1,36 @@ +from .struct import Struct +from .types import Array, Int16, Int32, Int64, Schema, String + +class OffsetResetStrategy(object): + LATEST = -1 + EARLIEST = -2 + NONE = 0 + + +class OffsetResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offsets', Array(Int64)))))) + ) + + +class OffsetRequest(Struct): + API_KEY = 2 + API_VERSION = 0 + RESPONSE_TYPE = OffsetResponse + SCHEMA = Schema( + ('replica_id', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('time', Int64), + ('max_offsets', Int32))))) + ) + DEFAULTS = { + 'replica_id': -1 + } diff --git a/kafka/protocol/pickle.py b/kafka/protocol/pickle.py new file mode 100644 index 0000000..b7e5264 --- /dev/null +++ b/kafka/protocol/pickle.py @@ -0,0 +1,29 @@ +from __future__ import absolute_import + +try: + import copyreg # pylint: disable=import-error +except ImportError: + import copy_reg as copyreg # pylint: disable=import-error + +import types + + +def _pickle_method(method): + func_name = method.im_func.__name__ + obj = method.im_self + cls = method.im_class + return _unpickle_method, (func_name, obj, cls) + + +def _unpickle_method(func_name, obj, cls): + for cls in cls.mro(): + try: + func = cls.__dict__[func_name] + except KeyError: + pass + else: + break + return func.__get__(obj, cls) + +# https://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods +copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method) diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py new file mode 100644 index 0000000..ef2f96e --- /dev/null +++ b/kafka/protocol/produce.py @@ -0,0 +1,29 @@ +from .message import MessageSet +from .struct import Struct +from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema + + +class ProduceResponse(Struct): + SCHEMA = Schema( + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('offset', Int64))))) + ) + + +class ProduceRequest(Struct): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = ProduceResponse + SCHEMA = Schema( + ('required_acks', Int16), + ('timeout', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('messages', MessageSet))))) + ) diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py new file mode 100644 index 0000000..ca1013e --- /dev/null +++ b/kafka/protocol/struct.py @@ -0,0 +1,64 @@ +#from collections import namedtuple +from io import BytesIO + +from .abstract import AbstractType +from .types import Schema + + +class Struct(AbstractType): + SCHEMA = Schema() + + def __init__(self, *args, **kwargs): + if len(args) == len(self.SCHEMA.fields): + for i, name in enumerate(self.SCHEMA.names): + self.__dict__[name] = args[i] + elif len(args) > 0: + raise ValueError('Args must be empty or mirror schema') + else: + self.__dict__.update(kwargs) + + # overloading encode() to support both class and instance + self.encode = self._encode_self + + @classmethod + def encode(cls, item): # pylint: disable=E0202 + bits = [] + for i, field in enumerate(cls.SCHEMA.fields): + bits.append(field.encode(item[i])) + return b''.join(bits) + + def _encode_self(self): + return self.SCHEMA.encode( + [self.__dict__[name] for name in self.SCHEMA.names] + ) + + @classmethod + def decode(cls, data): + if isinstance(data, bytes): + data = BytesIO(data) + return cls(*[field.decode(data) for field in cls.SCHEMA.fields]) + + def __repr__(self): + key_vals = [] + for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields): + key_vals.append('%s=%s' % (name, field.repr(self.__dict__[name]))) + return self.__class__.__name__ + '(' + ', '.join(key_vals) + ')' + + def __hash__(self): + return hash(self.encode()) + + def __eq__(self, other): + if self.SCHEMA != other.SCHEMA: + return False + for attr in self.SCHEMA.names: + if self.__dict__[attr] != other.__dict__[attr]: + return False + return True + +""" +class MetaStruct(type): + def __new__(cls, clsname, bases, dct): + nt = namedtuple(clsname, [name for (name, _) in dct['SCHEMA']]) + bases = tuple([Struct, nt] + list(bases)) + return super(MetaStruct, cls).__new__(cls, clsname, bases, dct) +""" diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py new file mode 100644 index 0000000..01799bb --- /dev/null +++ b/kafka/protocol/types.py @@ -0,0 +1,141 @@ +from __future__ import absolute_import + +from struct import pack, unpack + +from .abstract import AbstractType + + +class Int8(AbstractType): + @classmethod + def encode(cls, value): + return pack('>b', value) + + @classmethod + def decode(cls, data): + (value,) = unpack('>b', data.read(1)) + return value + + +class Int16(AbstractType): + @classmethod + def encode(cls, value): + return pack('>h', value) + + @classmethod + def decode(cls, data): + (value,) = unpack('>h', data.read(2)) + return value + + +class Int32(AbstractType): + @classmethod + def encode(cls, value): + return pack('>i', value) + + @classmethod + def decode(cls, data): + (value,) = unpack('>i', data.read(4)) + return value + + +class Int64(AbstractType): + @classmethod + def encode(cls, value): + return pack('>q', value) + + @classmethod + def decode(cls, data): + (value,) = unpack('>q', data.read(8)) + return value + + +class String(AbstractType): + def __init__(self, encoding='utf-8'): + self.encoding = encoding + + def encode(self, value): + if value is None: + return Int16.encode(-1) + value = str(value).encode(self.encoding) + return Int16.encode(len(value)) + value + + def decode(self, data): + length = Int16.decode(data) + if length < 0: + return None + return data.read(length).decode(self.encoding) + + +class Bytes(AbstractType): + @classmethod + def encode(cls, value): + if value is None: + return Int32.encode(-1) + else: + return Int32.encode(len(value)) + value + + @classmethod + def decode(cls, data): + length = Int32.decode(data) + if length < 0: + return None + return data.read(length) + + +class Schema(AbstractType): + def __init__(self, *fields): + if fields: + self.names, self.fields = zip(*fields) + else: + self.names, self.fields = (), () + + def encode(self, item): + if len(item) != len(self.fields): + raise ValueError('Item field count does not match Schema') + return b''.join([ + field.encode(item[i]) + for i, field in enumerate(self.fields) + ]) + + def decode(self, data): + return tuple([field.decode(data) for field in self.fields]) + + def __len__(self): + return len(self.fields) + + def repr(self, value): + key_vals = [] + try: + for i in range(len(self)): + try: + field_val = getattr(value, self.names[i]) + except AttributeError: + field_val = value[i] + key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val))) + return '(' + ', '.join(key_vals) + ')' + except: + return repr(value) + + +class Array(AbstractType): + def __init__(self, *array_of): + if len(array_of) > 1: + self.array_of = Schema(*array_of) + elif len(array_of) == 1 and (isinstance(array_of[0], AbstractType) or + issubclass(array_of[0], AbstractType)): + self.array_of = array_of[0] + else: + raise ValueError('Array instantiated with no array_of type') + + def encode(self, items): + return b''.join( + [Int32.encode(len(items))] + + [self.array_of.encode(item) for item in items] + ) + + def decode(self, data): + length = Int32.decode(data) + return [self.array_of.decode(data) for _ in range(length)] + + def repr(self, list_of_items): + return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']' |