summaryrefslogtreecommitdiff
path: root/kafka/protocol/produce.py
blob: b8753970cce91df434450b9a22f7240c7f9f7913 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from .api import AbstractRequest, AbstractResponse, MessageSet
from .types import Int8, Int16, Int32, Int64, Bytes, String, Array


class ProduceRequest(AbstractRequest):
    API_KEY = 0
    API_VERSION = 0
    __slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression')

    def __init__(self, topic_partition_messages,
                 required_acks=-1, timeout=1000, compression=None):
        """
        topic_partition_messages is a dict of dicts of lists (of messages)
        {
          "TopicFoo": {
            0: [
              Message('foo'),
              Message('bar')
            ],
            1: [
              Message('fizz'),
              Message('buzz')
            ]
          }
        }
        """
        self.required_acks = required_acks
        self.timeout = timeout
        self.topic_partition_messages = topic_partition_messages
        self.compression = compression

    @staticmethod
    def _encode_messages(partition, messages, compression):
        message_set = MessageSet.encode(messages)

        if compression:
            # compress message_set data and re-encode as single message
            # then wrap single compressed message in a new message_set
            pass

        return (Int32.encode(partition) +
                Int32.encode(len(message_set)) +
                message_set)

    def encode(self):
        request = (
            Int16.encode(self.required_acks) +
            Int32.encode(self.timeout) +
            Array.encode([(
                String.encode(topic) +
                Array.encode([
                    self._encode_messages(partition, messages, self.compression)
                    for partition, messages in partitions.iteritems()])
            ) for topic, partitions in self.topic_partition_messages.iteritems()])
        )
        return super(ProduceRequest, self).encode(request)