diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 17:32:06 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 17:35:52 -0800 |
commit | a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a (patch) | |
tree | 8cf6c4e62664c0dd4bcccd207eaaaa79310a450e /docs/simple.rst | |
parent | 650a27103cad82256f7d2be2853d628d187566c5 (diff) | |
download | kafka-python-a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a.tar.gz |
Update docs w/ KafkaProducer; move Simple clients to separate document
Diffstat (limited to 'docs/simple.rst')
-rw-r--r-- | docs/simple.rst | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/docs/simple.rst b/docs/simple.rst new file mode 100644 index 0000000..00a21ac --- /dev/null +++ b/docs/simple.rst @@ -0,0 +1,144 @@ +Simple APIs (DEPRECATED) +************************ + + +SimpleConsumer +============== + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To consume messages + client = SimpleClient('localhost:9092') + consumer = SimpleConsumer(client, "my-group", "my-topic") + for message in consumer: + # message is raw byte string -- decode if necessary! + # e.g., for unicode: `message.decode('utf-8')` + print(message) + + + # Use multiprocessing for parallel consumers + from kafka import MultiProcessConsumer + + # This will split the number of partitions among two processes + consumer = MultiProcessConsumer(client, "my-group", "my-topic", num_procs=2) + + # This will spawn processes such that each handles 2 partitions max + consumer = MultiProcessConsumer(client, "my-group", "my-topic", + partitions_per_proc=2) + + for message in consumer: + print(message) + + for message in consumer.get_messages(count=5, block=True, timeout=4): + print(message) + + client.close() + + +SimpleProducer +============== + +Asynchronous Mode +----------------- + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To send messages asynchronously + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=True) + producer.send_messages('my-topic', b'async message') + + # To send messages in batch. You can use any of the available + # producers for doing this. The following producer will collect + # messages in batch and send them to Kafka after 20 messages are + # collected or every 60 seconds + # Notes: + # * If the producer dies before the messages are sent, there will be losses + # * Call producer.stop() to send the messages and cleanup + producer = SimpleProducer(client, + async=True, + batch_send_every_n=20, + batch_send_every_t=60) + +Synchronous Mode +---------------- + +.. code:: python + + from kafka import SimpleProducer, SimpleClient + + # To send messages synchronously + client = SimpleClient('localhost:9092') + producer = SimpleProducer(client, async=False) + + # Note that the application is responsible for encoding messages to type bytes + producer.send_messages('my-topic', b'some message') + producer.send_messages('my-topic', b'this method', b'is variadic') + + # Send unicode message + producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8')) + + # To wait for acknowledgements + # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to + # a local log before sending response + # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed + # by all in sync replicas before sending a response + producer = SimpleProducer(client, + async=False, + req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, + ack_timeout=2000, + sync_fail_on_error=False) + + responses = producer.send_messages('my-topic', b'another message') + for r in responses: + logging.info(r.offset) + + +KeyedProducer +============= + +.. code:: python + + from kafka import ( + SimpleClient, KeyedProducer, + Murmur2Partitioner, RoundRobinPartitioner) + + kafka = SimpleClient('localhost:9092') + + # HashedPartitioner is default (currently uses python hash()) + producer = KeyedProducer(kafka) + producer.send_messages(b'my-topic', b'key1', b'some message') + producer.send_messages(b'my-topic', b'key2', b'this methode') + + # Murmur2Partitioner attempts to mirror the java client hashing + producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + + # Or just produce round-robin (or just use SimpleProducer) + producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + + +SimpleClient +============ + + +.. code:: python + + from kafka import SimpleClient, create_message + from kafka.protocol import KafkaProtocol + from kafka.common import ProduceRequest + + kafka = SimpleClient("localhost:9092") + + req = ProduceRequest(topic="my-topic", partition=1, + messages=[create_message("some message")]) + resps = kafka.send_produce_request(payloads=[req], fail_on_error=True) + kafka.close() + + resps[0].topic # "my-topic" + resps[0].partition # 1 + resps[0].error # 0 (hopefully) + resps[0].offset # offset of the first message sent in this request |