1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
from .api import AbstractRequest, AbstractResponse, MessageSet
from .types import Int8, Int16, Int32, Int64, Bytes, String, Array
class ProduceRequest(AbstractRequest):
API_KEY = 0
API_VERSION = 0
__slots__ = ('required_acks', 'timeout', 'topic_partition_messages', 'compression')
def __init__(self, topic_partition_messages,
required_acks=-1, timeout=1000, compression=None):
"""
topic_partition_messages is a dict of dicts of lists (of messages)
{
"TopicFoo": {
0: [
Message('foo'),
Message('bar')
],
1: [
Message('fizz'),
Message('buzz')
]
}
}
"""
self.required_acks = required_acks
self.timeout = timeout
self.topic_partition_messages = topic_partition_messages
self.compression = compression
@staticmethod
def _encode_messages(partition, messages, compression):
message_set = MessageSet.encode(messages)
if compression:
# compress message_set data and re-encode as single message
# then wrap single compressed message in a new message_set
pass
return (Int32.encode(partition) +
Int32.encode(len(message_set)) +
message_set)
def encode(self):
request = (
Int16.encode(self.required_acks) +
Int32.encode(self.timeout) +
Array.encode([(
String.encode(topic) +
Array.encode([
self._encode_messages(partition, messages, self.compression)
for partition, messages in partitions.iteritems()])
) for topic, partitions in self.topic_partition_messages.iteritems()])
)
return super(ProduceRequest, self).encode(request)
|