summaryrefslogtreecommitdiff
path: root/kafka/protocol
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
committerDana Powers <dana.powers@rd.io>2016-01-07 18:51:14 -0800
commit828377377da43749af0d27ee256ef31bf714cf17 (patch)
treefbad4d4381fc4d1ea2be7ce2009214d18fbeb674 /kafka/protocol
parent71e7568fcb8132899f366b37c32645fd5a40dc4b (diff)
parent9a8af1499ca425366d934487469d9977fae7fe5f (diff)
downloadkafka-python-828377377da43749af0d27ee256ef31bf714cf17.tar.gz
Merge branch '0.9'
Conflicts: kafka/codec.py kafka/version.py test/test_producer.py test/test_producer_integration.py
Diffstat (limited to 'kafka/protocol')
-rw-r--r--kafka/protocol/__init__.py6
-rw-r--r--kafka/protocol/abstract.py17
-rw-r--r--kafka/protocol/admin.py44
-rw-r--r--kafka/protocol/api.py16
-rw-r--r--kafka/protocol/commit.py119
-rw-r--r--kafka/protocol/fetch.py32
-rw-r--r--kafka/protocol/group.py103
-rw-r--r--kafka/protocol/legacy.py440
-rw-r--r--kafka/protocol/message.py144
-rw-r--r--kafka/protocol/metadata.py29
-rw-r--r--kafka/protocol/offset.py36
-rw-r--r--kafka/protocol/pickle.py29
-rw-r--r--kafka/protocol/produce.py29
-rw-r--r--kafka/protocol/struct.py64
-rw-r--r--kafka/protocol/types.py141
15 files changed, 1249 insertions, 0 deletions
diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py
new file mode 100644
index 0000000..7b2a2f3
--- /dev/null
+++ b/kafka/protocol/__init__.py
@@ -0,0 +1,6 @@
+from .legacy import (
+ create_message, create_gzip_message,
+ create_snappy_message, create_message_set,
+ CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
+ ATTRIBUTE_CODEC_MASK, KafkaProtocol,
+)
diff --git a/kafka/protocol/abstract.py b/kafka/protocol/abstract.py
new file mode 100644
index 0000000..160678f
--- /dev/null
+++ b/kafka/protocol/abstract.py
@@ -0,0 +1,17 @@
+import abc
+
+
+class AbstractType(object):
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def encode(cls, value): # pylint: disable=no-self-argument
+ pass
+
+ @abc.abstractmethod
+ def decode(cls, data): # pylint: disable=no-self-argument
+ pass
+
+ @classmethod
+ def repr(cls, value):
+ return repr(value)
diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py
new file mode 100644
index 0000000..56dd042
--- /dev/null
+++ b/kafka/protocol/admin.py
@@ -0,0 +1,44 @@
+from .struct import Struct
+from .types import Array, Bytes, Int16, Schema, String
+
+
+class ListGroupsResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('groups', Array(
+ ('group', String('utf-8')),
+ ('protocol_type', String('utf-8'))))
+ )
+
+
+class ListGroupsRequest(Struct):
+ API_KEY = 16
+ API_VERSION = 0
+ RESPONSE_TYPE = ListGroupsResponse
+ SCHEMA = Schema()
+
+
+class DescribeGroupsResponse(Struct):
+ SCHEMA = Schema(
+ ('groups', Array(
+ ('error_code', Int16),
+ ('group', String('utf-8')),
+ ('state', String('utf-8')),
+ ('protocol_type', String('utf-8')),
+ ('protocol', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('client_id', String('utf-8')),
+ ('client_host', String('utf-8')),
+ ('member_metadata', Bytes),
+ ('member_assignment', Bytes)))))
+ )
+
+
+class DescribeGroupsRequest(Struct):
+ API_KEY = 15
+ API_VERSION = 0
+ RESPONSE_TYPE = DescribeGroupsResponse
+ SCHEMA = Schema(
+ ('groups', Array(String('utf-8')))
+ )
diff --git a/kafka/protocol/api.py b/kafka/protocol/api.py
new file mode 100644
index 0000000..0c23437
--- /dev/null
+++ b/kafka/protocol/api.py
@@ -0,0 +1,16 @@
+from .struct import Struct
+from .types import Int16, Int32, String, Schema
+
+
+class RequestHeader(Struct):
+ SCHEMA = Schema(
+ ('api_key', Int16),
+ ('api_version', Int16),
+ ('correlation_id', Int32),
+ ('client_id', String('utf-8'))
+ )
+
+ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
+ super(RequestHeader, self).__init__(
+ request.API_KEY, request.API_VERSION, correlation_id, client_id
+ )
diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py
new file mode 100644
index 0000000..a32f8d3
--- /dev/null
+++ b/kafka/protocol/commit.py
@@ -0,0 +1,119 @@
+from .struct import Struct
+from .types import Array, Int16, Int32, Int64, Schema, String
+
+
+class OffsetCommitResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetCommitRequest_v2(Struct):
+ API_KEY = 8
+ API_VERSION = 2 # added retention_time, dropped timestamp
+ RESPONSE_TYPE = OffsetCommitResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('consumer_group_generation_id', Int32),
+ ('consumer_id', String('utf-8')),
+ ('retention_time', Int64),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8'))))))
+ )
+ DEFAULT_GENERATION_ID = -1
+ DEFAULT_RETENTION_TIME = -1
+
+
+class OffsetCommitRequest_v1(Struct):
+ API_KEY = 8
+ API_VERSION = 1 # Kafka-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('consumer_group_generation_id', Int32),
+ ('consumer_id', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('timestamp', Int64),
+ ('metadata', String('utf-8'))))))
+ )
+
+
+class OffsetCommitRequest_v0(Struct):
+ API_KEY = 8
+ API_VERSION = 0 # Zookeeper-backed storage
+ RESPONSE_TYPE = OffsetCommitResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8'))))))
+ )
+
+
+class OffsetFetchResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('metadata', String('utf-8')),
+ ('error_code', Int16)))))
+ )
+
+
+class OffsetFetchRequest_v1(Struct):
+ API_KEY = 9
+ API_VERSION = 1 # kafka-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32))))
+ )
+
+
+class OffsetFetchRequest_v0(Struct):
+ API_KEY = 9
+ API_VERSION = 0 # zookeeper-backed storage
+ RESPONSE_TYPE = OffsetFetchResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8')),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32))))
+ )
+
+
+class GroupCoordinatorResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('coordinator_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32)
+ )
+
+
+class GroupCoordinatorRequest(Struct):
+ API_KEY = 10
+ API_VERSION = 0
+ RESPONSE_TYPE = GroupCoordinatorResponse
+ SCHEMA = Schema(
+ ('consumer_group', String('utf-8'))
+ )
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
new file mode 100644
index 0000000..e00c9ab
--- /dev/null
+++ b/kafka/protocol/fetch.py
@@ -0,0 +1,32 @@
+from .message import MessageSet
+from .struct import Struct
+from .types import Array, Int16, Int32, Int64, Schema, String
+
+
+class FetchResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topics', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('highwater_offset', Int64),
+ ('message_set', MessageSet)))))
+ )
+
+
+class FetchRequest(Struct):
+ API_KEY = 1
+ API_VERSION = 0
+ RESPONSE_TYPE = FetchResponse
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('max_wait_time', Int32),
+ ('min_bytes', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('offset', Int64),
+ ('max_bytes', Int32)))))
+ )
diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py
new file mode 100644
index 0000000..72de005
--- /dev/null
+++ b/kafka/protocol/group.py
@@ -0,0 +1,103 @@
+from .struct import Struct
+from .types import Array, Bytes, Int16, Int32, Schema, String
+
+
+class JoinGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('generation_id', Int32),
+ ('group_protocol', String('utf-8')),
+ ('leader_id', String('utf-8')),
+ ('member_id', String('utf-8')),
+ ('members', Array(
+ ('member_id', String('utf-8')),
+ ('member_metadata', Bytes)))
+ )
+
+
+class JoinGroupRequest(Struct):
+ API_KEY = 11
+ API_VERSION = 0
+ RESPONSE_TYPE = JoinGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('session_timeout', Int32),
+ ('member_id', String('utf-8')),
+ ('protocol_type', String('utf-8')),
+ ('group_protocols', Array(
+ ('protocol_name', String('utf-8')),
+ ('protocol_metadata', Bytes)))
+ )
+ UNKNOWN_MEMBER_ID = ''
+
+
+class ProtocolMetadata(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('subscription', Array(String('utf-8'))), # topics list
+ ('user_data', Bytes)
+ )
+
+
+class SyncGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16),
+ ('member_assignment', Bytes)
+ )
+
+
+class SyncGroupRequest(Struct):
+ API_KEY = 14
+ API_VERSION = 0
+ RESPONSE_TYPE = SyncGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('generation_id', Int32),
+ ('member_id', String('utf-8')),
+ ('group_assignment', Array(
+ ('member_id', String('utf-8')),
+ ('member_metadata', Bytes)))
+ )
+
+
+class MemberAssignment(Struct):
+ SCHEMA = Schema(
+ ('version', Int16),
+ ('partition_assignment', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(Int32)))),
+ ('user_data', Bytes)
+ )
+
+
+class HeartbeatResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16)
+ )
+
+
+class HeartbeatRequest(Struct):
+ API_KEY = 12
+ API_VERSION = 0
+ RESPONSE_TYPE = HeartbeatResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('generation_id', Int32),
+ ('member_id', String('utf-8'))
+ )
+
+
+class LeaveGroupResponse(Struct):
+ SCHEMA = Schema(
+ ('error_code', Int16)
+ )
+
+
+class LeaveGroupRequest(Struct):
+ API_KEY = 13
+ API_VERSION = 0
+ RESPONSE_TYPE = LeaveGroupResponse
+ SCHEMA = Schema(
+ ('group', String('utf-8')),
+ ('member_id', String('utf-8'))
+ )
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
new file mode 100644
index 0000000..1835521
--- /dev/null
+++ b/kafka/protocol/legacy.py
@@ -0,0 +1,440 @@
+from __future__ import absolute_import
+
+import logging
+import struct
+
+import six
+
+from six.moves import xrange
+
+import kafka.common
+import kafka.protocol.commit
+import kafka.protocol.fetch
+import kafka.protocol.message
+import kafka.protocol.metadata
+import kafka.protocol.offset
+import kafka.protocol.produce
+
+from kafka.codec import (
+ gzip_encode, gzip_decode, snappy_encode, snappy_decode
+)
+from kafka.common import (
+ ProtocolError, ChecksumError,
+ UnsupportedCodecError,
+ ConsumerMetadataResponse
+)
+from kafka.util import (
+ crc32, read_short_string, read_int_string, relative_unpack,
+ write_short_string, write_int_string, group_by_topic_and_partition
+)
+
+
+log = logging.getLogger(__name__)
+
+ATTRIBUTE_CODEC_MASK = 0x03
+CODEC_NONE = 0x00
+CODEC_GZIP = 0x01
+CODEC_SNAPPY = 0x02
+ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
+
+
+class KafkaProtocol(object):
+ """
+ Class to encapsulate all of the protocol encoding/decoding.
+ This class does not have any state associated with it, it is purely
+ for organization.
+ """
+ PRODUCE_KEY = 0
+ FETCH_KEY = 1
+ OFFSET_KEY = 2
+ METADATA_KEY = 3
+ OFFSET_COMMIT_KEY = 8
+ OFFSET_FETCH_KEY = 9
+ CONSUMER_METADATA_KEY = 10
+
+ ###################
+ # Private API #
+ ###################
+
+ @classmethod
+ def _encode_message_header(cls, client_id, correlation_id, request_key,
+ version=0):
+ """
+ Encode the common request envelope
+ """
+ return struct.pack('>hhih%ds' % len(client_id),
+ request_key, # ApiKey
+ version, # ApiVersion
+ correlation_id, # CorrelationId
+ len(client_id), # ClientId size
+ client_id) # ClientId
+
+ @classmethod
+ def _encode_message_set(cls, messages):
+ """
+ Encode a MessageSet. Unlike other arrays in the protocol,
+ MessageSets are not length-prefixed
+
+ Format
+ ======
+ MessageSet => [Offset MessageSize Message]
+ Offset => int64
+ MessageSize => int32
+ """
+ message_set = []
+ for message in messages:
+ encoded_message = KafkaProtocol._encode_message(message)
+ message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
+ len(encoded_message),
+ encoded_message))
+ return b''.join(message_set)
+
+ @classmethod
+ def _encode_message(cls, message):
+ """
+ Encode a single message.
+
+ The magic number of a message is a format version number.
+ The only supported magic number right now is zero
+
+ Format
+ ======
+ Message => Crc MagicByte Attributes Key Value
+ Crc => int32
+ MagicByte => int8
+ Attributes => int8
+ Key => bytes
+ Value => bytes
+ """
+ if message.magic == 0:
+ msg = b''.join([
+ struct.pack('>BB', message.magic, message.attributes),
+ write_int_string(message.key),
+ write_int_string(message.value)
+ ])
+ crc = crc32(msg)
+ msg = struct.pack('>i%ds' % len(msg), crc, msg)
+ else:
+ raise ProtocolError("Unexpected magic number: %d" % message.magic)
+ return msg
+
+ ##################
+ # Public API #
+ ##################
+
+ @classmethod
+ def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
+ """
+ Encode a ProduceRequest struct
+
+ Arguments:
+ payloads: list of ProduceRequestPayload
+ acks: How "acky" you want the request to be
+ 1: written to disk by the leader
+ 0: immediate response
+ -1: waits for all replicas to be in sync
+ timeout: Maximum time (in ms) the server will wait for replica acks.
+ This is _not_ a socket timeout
+
+ Returns: ProduceRequest
+ """
+ if acks not in (1, 0, -1):
+ raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
+
+ return kafka.protocol.produce.ProduceRequest(
+ required_acks=acks,
+ timeout=timeout,
+ topics=[(
+ topic,
+ [(
+ partition,
+ [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key,
+ magic=msg.magic,
+ attributes=msg.attributes))
+ for msg in payload.messages])
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+
+ @classmethod
+ def decode_produce_response(cls, response):
+ """
+ Decode ProduceResponse to ProduceResponsePayload
+
+ Arguments:
+ response: ProduceResponse
+
+ Return: list of ProduceResponsePayload
+ """
+ return [
+ kafka.common.ProduceResponsePayload(topic, partition, error, offset)
+ for topic, partitions in response.topics
+ for partition, error, offset in partitions
+ ]
+
+ @classmethod
+ def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
+ """
+ Encodes a FetchRequest struct
+
+ Arguments:
+ payloads: list of FetchRequestPayload
+ max_wait_time (int, optional): ms to block waiting for min_bytes
+ data. Defaults to 100.
+ min_bytes (int, optional): minimum bytes required to return before
+ max_wait_time. Defaults to 4096.
+
+ Return: FetchRequest
+ """
+ return kafka.protocol.fetch.FetchRequest(
+ replica_id=-1,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.max_bytes)
+ for partition, payload in topic_payloads.items()])
+ for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
+
+ @classmethod
+ def decode_fetch_response(cls, response):
+ """
+ Decode FetchResponse struct to FetchResponsePayloads
+
+ Arguments:
+ response: FetchResponse
+ """
+ return [
+ kafka.common.FetchResponsePayload(
+ topic, partition, error, highwater_offset, [
+ kafka.common.OffsetAndMessage(offset, message)
+ for offset, _, message in messages])
+ for topic, partitions in response.topics
+ for partition, error, highwater_offset, messages in partitions
+ ]
+
+ @classmethod
+ def encode_offset_request(cls, payloads=()):
+ return kafka.protocol.offset.OffsetRequest(
+ replica_id=-1,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.time,
+ payload.max_offsets)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+ @classmethod
+ def decode_offset_response(cls, response):
+ """
+ Decode OffsetResponse into OffsetResponsePayloads
+
+ Arguments:
+ response: OffsetResponse
+
+ Returns: list of OffsetResponsePayloads
+ """
+ return [
+ kafka.common.OffsetResponsePayload(topic, partition, error, tuple(offsets))
+ for topic, partitions in response.topics
+ for partition, error, offsets in partitions
+ ]
+
+ @classmethod
+ def encode_metadata_request(cls, topics=(), payloads=None):
+ """
+ Encode a MetadataRequest
+
+ Arguments:
+ topics: list of strings
+ """
+ if payloads is not None:
+ topics = payloads
+
+ return kafka.protocol.metadata.MetadataRequest(topics)
+
+ @classmethod
+ def decode_metadata_response(cls, response):
+ return response
+
+ @classmethod
+ def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
+ """
+ Encode a ConsumerMetadataRequest
+
+ Arguments:
+ client_id: string
+ correlation_id: int
+ payloads: string (consumer group)
+ """
+ message = []
+ message.append(cls._encode_message_header(client_id, correlation_id,
+ KafkaProtocol.CONSUMER_METADATA_KEY))
+ message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
+
+ msg = b''.join(message)
+ return write_int_string(msg)
+
+ @classmethod
+ def decode_consumer_metadata_response(cls, data):
+ """
+ Decode bytes to a ConsumerMetadataResponse
+
+ Arguments:
+ data: bytes to decode
+ """
+ ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
+ (host, cur) = read_short_string(data, cur)
+ ((port,), cur) = relative_unpack('>i', data, cur)
+
+ return ConsumerMetadataResponse(error, nodeId, host, port)
+
+ @classmethod
+ def encode_offset_commit_request(cls, group, payloads):
+ """
+ Encode an OffsetCommitRequest struct
+
+ Arguments:
+ group: string, the consumer group you are committing offsets for
+ payloads: list of OffsetCommitRequestPayload
+ """
+ return kafka.protocol.commit.OffsetCommitRequest_v0(
+ consumer_group=group,
+ topics=[(
+ topic,
+ [(
+ partition,
+ payload.offset,
+ payload.metadata)
+ for partition, payload in six.iteritems(topic_payloads)])
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+
+ @classmethod
+ def decode_offset_commit_response(cls, response):
+ """
+ Decode OffsetCommitResponse to an OffsetCommitResponsePayload
+
+ Arguments:
+ response: OffsetCommitResponse
+ """
+ return [
+ kafka.common.OffsetCommitResponsePayload(topic, partition, error)
+ for topic, partitions in response.topics
+ for partition, error in partitions
+ ]
+
+ @classmethod
+ def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
+ """
+ Encode an OffsetFetchRequest struct. The request is encoded using
+ version 0 if from_kafka is false, indicating a request for Zookeeper
+ offsets. It is encoded using version 1 otherwise, indicating a request
+ for Kafka offsets.
+
+ Arguments:
+ group: string, the consumer group you are fetching offsets for
+ payloads: list of OffsetFetchRequestPayload
+ from_kafka: bool, default False, set True for Kafka-committed offsets
+ """
+ if from_kafka:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v1
+ else:
+ request_class = kafka.protocol.commit.OffsetFetchRequest_v0
+
+ return request_class(
+ consumer_group=group,
+ topics=[(
+ topic,
+ list(topic_payloads.keys()))
+ for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
+
+ @classmethod
+ def decode_offset_fetch_response(cls, response):
+ """
+ Decode OffsetFetchResponse to OffsetFetchResponsePayloads
+
+ Arguments:
+ response: OffsetFetchResponse
+ """
+ return [
+ kafka.common.OffsetFetchResponsePayload(
+ topic, partition, offset, metadata, error
+ )
+ for topic, partitions in response.topics
+ for partition, offset, metadata, error in partitions
+ ]
+
+
+def create_message(payload, key=None):
+ """
+ Construct a Message
+
+ Arguments:
+ payload: bytes, the payload to send to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ return kafka.common.Message(0, 0, key, payload)
+
+
+def create_gzip_message(payloads, key=None, compresslevel=None):
+ """
+ Construct a Gzipped Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
+
+ gzipped = gzip_encode(message_set, compresslevel=compresslevel)
+ codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
+
+ return kafka.common.Message(0, 0x00 | codec, key, gzipped)
+
+
+def create_snappy_message(payloads, key=None):
+ """
+ Construct a Snappy Message containing multiple Messages
+
+ The given payloads will be encoded, compressed, and sent as a single atomic
+ message to Kafka.
+
+ Arguments:
+ payloads: list(bytes), a list of payload to send be sent to Kafka
+ key: bytes, a key used for partition routing (optional)
+
+ """
+ message_set = KafkaProtocol._encode_message_set(
+ [create_message(payload, pl_key) for payload, pl_key in payloads])
+
+ snapped = snappy_encode(message_set)
+ codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
+
+ return kafka.common.Message(0, 0x00 | codec, key, snapped)
+
+
+def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
+ """Create a message set using the given codec.
+
+ If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
+ return a list containing a single codec-encoded message.
+ """
+ if codec == CODEC_NONE:
+ return [create_message(m, k) for m, k in messages]
+ elif codec == CODEC_GZIP:
+ return [create_gzip_message(messages, key, compresslevel)]
+ elif codec == CODEC_SNAPPY:
+ return [create_snappy_message(messages, key)]
+ else:
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
new file mode 100644
index 0000000..2648e24
--- /dev/null
+++ b/kafka/protocol/message.py
@@ -0,0 +1,144 @@
+import io
+
+from ..codec import gzip_decode, snappy_decode
+from . import pickle
+from .struct import Struct
+from .types import (
+ Int8, Int32, Int64, Bytes, Schema, AbstractType
+)
+from ..util import crc32
+
+
+class Message(Struct):
+ SCHEMA = Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('key', Bytes),
+ ('value', Bytes)
+ )
+ CODEC_MASK = 0x03
+ CODEC_GZIP = 0x01
+ CODEC_SNAPPY = 0x02
+
+ def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
+ assert value is None or isinstance(value, bytes), 'value must be bytes'
+ assert key is None or isinstance(key, bytes), 'key must be bytes'
+ self.crc = crc
+ self.magic = magic
+ self.attributes = attributes
+ self.key = key
+ self.value = value
+ self.encode = self._encode_self
+
+ def _encode_self(self, recalc_crc=True):
+ message = Message.SCHEMA.encode(
+ (self.crc, self.magic, self.attributes, self.key, self.value)
+ )
+ if not recalc_crc:
+ return message
+ self.crc = crc32(message[4:])
+ return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
+
+ @classmethod
+ def decode(cls, data):
+ if isinstance(data, bytes):
+ data = io.BytesIO(data)
+ fields = [field.decode(data) for field in cls.SCHEMA.fields]
+ return cls(fields[4], key=fields[3],
+ magic=fields[1], attributes=fields[2], crc=fields[0])
+
+ def validate_crc(self):
+ raw_msg = self._encode_self(recalc_crc=False)
+ crc = crc32(raw_msg[4:])
+ if crc == self.crc:
+ return True
+ return False
+
+ def is_compressed(self):
+ return self.attributes & self.CODEC_MASK != 0
+
+ def decompress(self):
+ codec = self.attributes & self.CODEC_MASK
+ assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY)
+ if codec == self.CODEC_GZIP:
+ raw_bytes = gzip_decode(self.value)
+ else:
+ raw_bytes = snappy_decode(self.value)
+
+ return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))
+
+ def __hash__(self):
+ return hash(self._encode_self(recalc_crc=False))
+
+
+class PartialMessage(bytes):
+ def __repr__(self):
+ return 'PartialMessage(%s)' % self
+
+
+class MessageSet(AbstractType):
+ ITEM = Schema(
+ ('offset', Int64),
+ ('message_size', Int32),
+ ('message', Message.SCHEMA)
+ )
+
+ @classmethod
+ def encode(cls, items, size=True, recalc_message_size=True):
+ encoded_values = []
+ for (offset, message_size, message) in items:
+ if isinstance(message, Message):
+ encoded_message = message.encode()
+ else:
+ encoded_message = cls.ITEM.fields[2].encode(message)
+ if recalc_message_size:
+ message_size = len(encoded_message)
+ encoded_values.append(cls.ITEM.fields[0].encode(offset))
+ encoded_values.append(cls.ITEM.fields[1].encode(message_size))
+ encoded_values.append(encoded_message)
+ encoded = b''.join(encoded_values)
+ if not size:
+ return encoded
+ return Int32.encode(len(encoded)) + encoded
+
+ @classmethod
+ def decode(cls, data, bytes_to_read=None):
+ """Compressed messages should pass in bytes_to_read (via message size)
+ otherwise, we decode from data as Int32
+ """
+ if isinstance(data, bytes):
+ data = io.BytesIO(data)
+ if bytes_to_read is None:
+ bytes_to_read = Int32.decode(data)
+ items = []
+
+ # We need at least 8 + 4 + 14 bytes to read offset + message size + message
+ # (14 bytes is a message w/ null key and null value)
+ while bytes_to_read >= 26:
+ offset = Int64.decode(data)
+ bytes_to_read -= 8
+
+ message_size = Int32.decode(data)
+ bytes_to_read -= 4
+
+ # if FetchRequest max_bytes is smaller than the available message set
+ # the server returns partial data for the final message
+ if message_size > bytes_to_read:
+ break
+
+ message = Message.decode(data)
+ bytes_to_read -= message_size
+
+ items.append((offset, message_size, message))
+
+ # If any bytes are left over, clear them from the buffer
+ # and append a PartialMessage to signal that max_bytes may be too small
+ if bytes_to_read:
+ items.append((None, None, PartialMessage(data.read(bytes_to_read))))
+
+ return items
+
+ @classmethod
+ def repr(cls, messages):
+ return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'
diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py
new file mode 100644
index 0000000..810f1b8
--- /dev/null
+++ b/kafka/protocol/metadata.py
@@ -0,0 +1,29 @@
+from .struct import Struct
+from .types import Array, Int16, Int32, Schema, String
+
+
+class MetadataResponse(Struct):
+ SCHEMA = Schema(
+ ('brokers', Array(
+ ('node_id', Int32),
+ ('host', String('utf-8')),
+ ('port', Int32))),
+ ('topics', Array(
+ ('error_code', Int16),
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('error_code', Int16),
+ ('partition', Int32),
+ ('leader', Int32),
+ ('replicas', Array(Int32)),
+ ('isr', Array(Int32))))))
+ )
+
+
+class MetadataRequest(Struct):
+ API_KEY = 3
+ API_VERSION = 0
+ RESPONSE_TYPE = MetadataResponse
+ SCHEMA = Schema(
+ ('topics', Array(String('utf-8')))
+ )
diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py
new file mode 100644
index 0000000..606f1f1
--- /dev/null
+++ b/kafka/protocol/offset.py
@@ -0,0 +1,36 @@
+from .struct import Struct
+from .types import Array, Int16, Int32, Int64, Schema, String
+
+class OffsetResetStrategy(object):
+ LATEST = -1
+ EARLIEST = -2
+ NONE = 0
+
+
+class OffsetResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offsets', Array(Int64))))))
+ )
+
+
+class OffsetRequest(Struct):
+ API_KEY = 2
+ API_VERSION = 0
+ RESPONSE_TYPE = OffsetResponse
+ SCHEMA = Schema(
+ ('replica_id', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('time', Int64),
+ ('max_offsets', Int32)))))
+ )
+ DEFAULTS = {
+ 'replica_id': -1
+ }
diff --git a/kafka/protocol/pickle.py b/kafka/protocol/pickle.py
new file mode 100644
index 0000000..b7e5264
--- /dev/null
+++ b/kafka/protocol/pickle.py
@@ -0,0 +1,29 @@
+from __future__ import absolute_import
+
+try:
+ import copyreg # pylint: disable=import-error
+except ImportError:
+ import copy_reg as copyreg # pylint: disable=import-error
+
+import types
+
+
+def _pickle_method(method):
+ func_name = method.im_func.__name__
+ obj = method.im_self
+ cls = method.im_class
+ return _unpickle_method, (func_name, obj, cls)
+
+
+def _unpickle_method(func_name, obj, cls):
+ for cls in cls.mro():
+ try:
+ func = cls.__dict__[func_name]
+ except KeyError:
+ pass
+ else:
+ break
+ return func.__get__(obj, cls)
+
+# https://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
+copyreg.pickle(types.MethodType, _pickle_method, _unpickle_method)
diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py
new file mode 100644
index 0000000..ef2f96e
--- /dev/null
+++ b/kafka/protocol/produce.py
@@ -0,0 +1,29 @@
+from .message import MessageSet
+from .struct import Struct
+from .types import Int8, Int16, Int32, Int64, Bytes, String, Array, Schema
+
+
+class ProduceResponse(Struct):
+ SCHEMA = Schema(
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('error_code', Int16),
+ ('offset', Int64)))))
+ )
+
+
+class ProduceRequest(Struct):
+ API_KEY = 0
+ API_VERSION = 0
+ RESPONSE_TYPE = ProduceResponse
+ SCHEMA = Schema(
+ ('required_acks', Int16),
+ ('timeout', Int32),
+ ('topics', Array(
+ ('topic', String('utf-8')),
+ ('partitions', Array(
+ ('partition', Int32),
+ ('messages', MessageSet)))))
+ )
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
new file mode 100644
index 0000000..ca1013e
--- /dev/null
+++ b/kafka/protocol/struct.py
@@ -0,0 +1,64 @@
+#from collections import namedtuple
+from io import BytesIO
+
+from .abstract import AbstractType
+from .types import Schema
+
+
+class Struct(AbstractType):
+ SCHEMA = Schema()
+
+ def __init__(self, *args, **kwargs):
+ if len(args) == len(self.SCHEMA.fields):
+ for i, name in enumerate(self.SCHEMA.names):
+ self.__dict__[name] = args[i]
+ elif len(args) > 0:
+ raise ValueError('Args must be empty or mirror schema')
+ else:
+ self.__dict__.update(kwargs)
+
+ # overloading encode() to support both class and instance
+ self.encode = self._encode_self
+
+ @classmethod
+ def encode(cls, item): # pylint: disable=E0202
+ bits = []
+ for i, field in enumerate(cls.SCHEMA.fields):
+ bits.append(field.encode(item[i]))
+ return b''.join(bits)
+
+ def _encode_self(self):
+ return self.SCHEMA.encode(
+ [self.__dict__[name] for name in self.SCHEMA.names]
+ )
+
+ @classmethod
+ def decode(cls, data):
+ if isinstance(data, bytes):
+ data = BytesIO(data)
+ return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
+
+ def __repr__(self):
+ key_vals = []
+ for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
+ key_vals.append('%s=%s' % (name, field.repr(self.__dict__[name])))
+ return self.__class__.__name__ + '(' + ', '.join(key_vals) + ')'
+
+ def __hash__(self):
+ return hash(self.encode())
+
+ def __eq__(self, other):
+ if self.SCHEMA != other.SCHEMA:
+ return False
+ for attr in self.SCHEMA.names:
+ if self.__dict__[attr] != other.__dict__[attr]:
+ return False
+ return True
+
+"""
+class MetaStruct(type):
+ def __new__(cls, clsname, bases, dct):
+ nt = namedtuple(clsname, [name for (name, _) in dct['SCHEMA']])
+ bases = tuple([Struct, nt] + list(bases))
+ return super(MetaStruct, cls).__new__(cls, clsname, bases, dct)
+"""
diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py
new file mode 100644
index 0000000..01799bb
--- /dev/null
+++ b/kafka/protocol/types.py
@@ -0,0 +1,141 @@
+from __future__ import absolute_import
+
+from struct import pack, unpack
+
+from .abstract import AbstractType
+
+
+class Int8(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return pack('>b', value)
+
+ @classmethod
+ def decode(cls, data):
+ (value,) = unpack('>b', data.read(1))
+ return value
+
+
+class Int16(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return pack('>h', value)
+
+ @classmethod
+ def decode(cls, data):
+ (value,) = unpack('>h', data.read(2))
+ return value
+
+
+class Int32(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return pack('>i', value)
+
+ @classmethod
+ def decode(cls, data):
+ (value,) = unpack('>i', data.read(4))
+ return value
+
+
+class Int64(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ return pack('>q', value)
+
+ @classmethod
+ def decode(cls, data):
+ (value,) = unpack('>q', data.read(8))
+ return value
+
+
+class String(AbstractType):
+ def __init__(self, encoding='utf-8'):
+ self.encoding = encoding
+
+ def encode(self, value):
+ if value is None:
+ return Int16.encode(-1)
+ value = str(value).encode(self.encoding)
+ return Int16.encode(len(value)) + value
+
+ def decode(self, data):
+ length = Int16.decode(data)
+ if length < 0:
+ return None
+ return data.read(length).decode(self.encoding)
+
+
+class Bytes(AbstractType):
+ @classmethod
+ def encode(cls, value):
+ if value is None:
+ return Int32.encode(-1)
+ else:
+ return Int32.encode(len(value)) + value
+
+ @classmethod
+ def decode(cls, data):
+ length = Int32.decode(data)
+ if length < 0:
+ return None
+ return data.read(length)
+
+
+class Schema(AbstractType):
+ def __init__(self, *fields):
+ if fields:
+ self.names, self.fields = zip(*fields)
+ else:
+ self.names, self.fields = (), ()
+
+ def encode(self, item):
+ if len(item) != len(self.fields):
+ raise ValueError('Item field count does not match Schema')
+ return b''.join([
+ field.encode(item[i])
+ for i, field in enumerate(self.fields)
+ ])
+
+ def decode(self, data):
+ return tuple([field.decode(data) for field in self.fields])
+
+ def __len__(self):
+ return len(self.fields)
+
+ def repr(self, value):
+ key_vals = []
+ try:
+ for i in range(len(self)):
+ try:
+ field_val = getattr(value, self.names[i])
+ except AttributeError:
+ field_val = value[i]
+ key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val)))
+ return '(' + ', '.join(key_vals) + ')'
+ except:
+ return repr(value)
+
+
+class Array(AbstractType):
+ def __init__(self, *array_of):
+ if len(array_of) > 1:
+ self.array_of = Schema(*array_of)
+ elif len(array_of) == 1 and (isinstance(array_of[0], AbstractType) or
+ issubclass(array_of[0], AbstractType)):
+ self.array_of = array_of[0]
+ else:
+ raise ValueError('Array instantiated with no array_of type')
+
+ def encode(self, items):
+ return b''.join(
+ [Int32.encode(len(items))] +
+ [self.array_of.encode(item) for item in items]
+ )
+
+ def decode(self, data):
+ length = Int32.decode(data)
+ return [self.array_of.decode(data) for _ in range(length)]
+
+ def repr(self, list_of_items):
+ return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'