1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
from itertools import cycle
from multiprocessing import Queue, Process
import logging
from kafka.common import ProduceRequest
from kafka.protocol import create_message
from kafka.partitioner import HashedPartitioner
log = logging.getLogger("kafka")
class Producer(object):
"""
Base class to be used by producers
Params:
client - The Kafka client instance to use
topic - The topic for sending messages to
async - If set to true, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
"""
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
DEFAULT_ACK_TIMEOUT = 1000
def __init__(self, client, async=False, req_acks=ACK_AFTER_LOCAL_WRITE,
ack_timeout=DEFAULT_ACK_TIMEOUT):
self.client = client
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
if self.async:
self.queue = Queue() # Messages are sent through this queue
self.proc = Process(target=self._send_upstream, args=(self.queue,))
self.proc.daemon = True # Process will die if main thread exits
self.proc.start()
def _send_upstream(self, queue):
"""
Listen on the queue for messages and send them upstream to the brokers
"""
while True:
req = queue.get()
# Ignore any acks in the async mode
self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
def send_request(self, req):
"""
Helper method to send produce requests
"""
resp = []
if self.async:
self.queue.put(req)
else:
resp = self.client.send_produce_request([req], acks=self.req_acks,
timeout=self.ack_timeout)
return resp
def stop(self):
if self.async:
self.proc.terminate()
self.proc.join()
class SimpleProducer(Producer):
"""
A simple, round-robbin producer. Each message goes to exactly one partition
Params:
client - The Kafka client instance to use
topic - The topic for sending messages to
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
req_acks - A value indicating the acknowledgements that the server must
receive before responding to the request
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
"""
def __init__(self, client, topic, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
self.next_partition = cycle(client.topic_partitions[topic])
super(SimpleProducer, self).__init__(client, async,
req_acks, ack_timeout)
def send_messages(self, *msg):
req = ProduceRequest(self.topic, self.next_partition.next(),
messages=[create_message(m) for m in msg])
return self.send_request(req)
class KeyedProducer(Producer):
"""
A producer which distributes messages to partitions based on the key
Args:
client - The kafka client instance
topic - The kafka topic to send messages to
partitioner - A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner
async - If True, the messages are sent asynchronously via another
thread (process). We will not wait for a response to these
ack_timeout - Value (in milliseconds) indicating a timeout for waiting
for an acknowledgement
"""
def __init__(self, client, topic, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT):
self.topic = topic
client._load_metadata_for_topics(topic)
if not partitioner:
partitioner = HashedPartitioner
self.partitioner = partitioner(client.topic_partitions[topic])
super(KeyedProducer, self).__init__(client, async,
req_acks, ack_timeout)
def send(self, key, msg):
partitions = self.client.topic_partitions[self.topic]
partition = self.partitioner.partition(key, partitions)
req = ProduceRequest(self.topic, partition,
messages=[create_message(msg)])
return self.send_request(req)
|