summaryrefslogtreecommitdiff
path: root/docs/usage.rst
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-25 17:32:06 -0800
committerDana Powers <dana.powers@rd.io>2016-01-25 17:35:52 -0800
commita2e9eb5214da94ee8d71a66315ed4a8bf08baf5a (patch)
tree8cf6c4e62664c0dd4bcccd207eaaaa79310a450e /docs/usage.rst
parent650a27103cad82256f7d2be2853d628d187566c5 (diff)
downloadkafka-python-a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a.tar.gz
Update docs w/ KafkaProducer; move Simple clients to separate document
Diffstat (limited to 'docs/usage.rst')
-rw-r--r--docs/usage.rst96
1 files changed, 26 insertions, 70 deletions
diff --git a/docs/usage.rst b/docs/usage.rst
index e74e5af..f2bea06 100644
--- a/docs/usage.rst
+++ b/docs/usage.rst
@@ -50,85 +50,41 @@ There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
-SimpleProducer
+KafkaProducer
==============
-Asynchronous Mode
------------------
-
-.. code:: python
-
- from kafka import SimpleProducer, SimpleClient
-
- # To send messages asynchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=True)
- producer.send_messages('my-topic', b'async message')
-
- # To send messages in batch. You can use any of the available
- # producers for doing this. The following producer will collect
- # messages in batch and send them to Kafka after 20 messages are
- # collected or every 60 seconds
- # Notes:
- # * If the producer dies before the messages are sent, there will be losses
- # * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(client,
- async=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-
-Synchronous Mode
-----------------
-
.. code:: python
- from kafka import SimpleProducer, SimpleClient
+ from kafka import KafkaProducer
- # To send messages synchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async=False)
+ producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
- # Note that the application is responsible for encoding messages to type bytes
- producer.send_messages('my-topic', b'some message')
- producer.send_messages('my-topic', b'this method', b'is variadic')
+ # Asynchronous by default
+ future = producer.send('my-topic', b'raw_bytes')
- # Send unicode message
- producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
-
- # To wait for acknowledgements
- # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
- # a local log before sending response
- # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
- # by all in sync replicas before sending a response
- producer = SimpleProducer(client,
- async=False,
- req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000,
- sync_fail_on_error=False)
-
- responses = producer.send_messages('my-topic', b'another message')
- for r in responses:
- logging.info(r.offset)
-
-
-KeyedProducer
-=============
-
-.. code:: python
+ # Block for 'synchronous' sends
+ try:
+ record_metadata = future.get(timeout=10)
+ except KafkaError:
+ # Decide what to do if produce request failed...
+ log.exception()
+ pass
- from kafka import (
- SimpleClient, KeyedProducer,
- Murmur2Partitioner, RoundRobinPartitioner)
+ # Successful result returns assigned partition and offset
+ print (record_metadata.topic)
+ print (record_metadata.partition)
+ print (record_metadata.offset)
- kafka = SimpleClient('localhost:9092')
+ # produce keyed messages to enable hashed partitioning
+ producer.send('my-topic', key=b'foo', value=b'bar')
- # HashedPartitioner is default (currently uses python hash())
- producer = KeyedProducer(kafka)
- producer.send_messages(b'my-topic', b'key1', b'some message')
- producer.send_messages(b'my-topic', b'key2', b'this methode')
+ # encode objects via msgpack
+ producer = KafkaProducer(value_serializer=msgpack.dumps)
+ producer.send('msgpack-topic', {'key': 'value'})
- # Murmur2Partitioner attempts to mirror the java client hashing
- producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
+ # produce json messages
+ producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
+ producer.send('json-topic', {'key': 'value'})
- # Or just produce round-robin (or just use SimpleProducer)
- producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
+ # configure multiple retries
+ producer = KafkaProducer(retries=5)