diff options
-rw-r--r-- | kafka/client.py | 6 | ||||
-rw-r--r-- | kafka/producer.py | 4 | ||||
-rw-r--r-- | test/integration.py | 11 |
3 files changed, 11 insertions, 10 deletions
diff --git a/kafka/client.py b/kafka/client.py index 862a30e..23b0a48 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -143,7 +143,7 @@ class KafkaClient(object): for conn in self.conns.values(): conn.close() - def send_produce_request(self, payloads=[], fail_on_error=True, callback=None): + def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): """ Encode and send some ProduceRequests @@ -162,8 +162,8 @@ class KafkaClient(object): list of ProduceResponse or callback(ProduceResponse), in the order of input payloads """ resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_produce_request, - KafkaProtocol.decode_produce_response) + partial(KafkaProtocol.encode_produce_request, acks=acks, timeout=timeout), + KafkaProtocol.decode_produce_response) out = [] for resp in resps: # Check for errors diff --git a/kafka/producer.py b/kafka/producer.py index 93d6e3b..47e690b 100644 --- a/kafka/producer.py +++ b/kafka/producer.py @@ -16,8 +16,8 @@ class SimpleProducer(object): self.client._load_metadata_for_topics(topic) self.next_partition = cycle(self.client.topic_partitions[topic]) - def send_message(self, msg): + def send_messages(self, *msg): req = ProduceRequest(self.topic, self.next_partition.next(), - messages=[create_message(msg)]) + messages=[create_message(m) for m in msg]) resp = self.client.send_produce_request([req])[0] assert resp.error == 0 diff --git a/test/integration.py b/test/integration.py index e51b398..04d5979 100644 --- a/test/integration.py +++ b/test/integration.py @@ -359,22 +359,23 @@ class TestKafkaClient(unittest.TestCase): def test_simple_producer(self): producer = SimpleProducer(self.client, "test_simple_producer") - producer.send_message("one") - producer.send_message("two") + producer.send_messages("one", "two") + producer.send_messages("three") fetch1 = FetchRequest("test_simple_producer", 0, 0, 1024) fetch2 = FetchRequest("test_simple_producer", 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) self.assertEquals(fetch_resp1.error, 0) - self.assertEquals(fetch_resp1.highwaterMark, 1) + self.assertEquals(fetch_resp1.highwaterMark, 2) messages = list(fetch_resp1.messages) - self.assertEquals(len(messages), 1) + self.assertEquals(len(messages), 2) self.assertEquals(messages[0].message.value, "one") + self.assertEquals(messages[1].message.value, "two") self.assertEquals(fetch_resp2.error, 0) self.assertEquals(fetch_resp2.highwaterMark, 1) messages = list(fetch_resp2.messages) self.assertEquals(len(messages), 1) - self.assertEquals(messages[0].message.value, "two") + self.assertEquals(messages[0].message.value, "three") class TestConsumer(unittest.TestCase): @classmethod |