diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-25 17:32:06 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-25 17:35:52 -0800 |
commit | a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a (patch) | |
tree | 8cf6c4e62664c0dd4bcccd207eaaaa79310a450e /docs/usage.rst | |
parent | 650a27103cad82256f7d2be2853d628d187566c5 (diff) | |
download | kafka-python-a2e9eb5214da94ee8d71a66315ed4a8bf08baf5a.tar.gz |
Update docs w/ KafkaProducer; move Simple clients to separate document
Diffstat (limited to 'docs/usage.rst')
-rw-r--r-- | docs/usage.rst | 96 |
1 files changed, 26 insertions, 70 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index e74e5af..f2bea06 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -50,85 +50,41 @@ There are many configuration options for the consumer class. See :class:`~kafka.KafkaConsumer` API documentation for more details. -SimpleProducer +KafkaProducer ============== -Asynchronous Mode ------------------ - -.. code:: python - - from kafka import SimpleProducer, SimpleClient - - # To send messages asynchronously - client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=True) - producer.send_messages('my-topic', b'async message') - - # To send messages in batch. You can use any of the available - # producers for doing this. The following producer will collect - # messages in batch and send them to Kafka after 20 messages are - # collected or every 60 seconds - # Notes: - # * If the producer dies before the messages are sent, there will be losses - # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(client, - async=True, - batch_send_every_n=20, - batch_send_every_t=60) - -Synchronous Mode ----------------- - .. code:: python - from kafka import SimpleProducer, SimpleClient + from kafka import KafkaProducer - # To send messages synchronously - client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=False) + producer = KafkaProducer(bootstrap_servers=['broker1:1234']) - # Note that the application is responsible for encoding messages to type bytes - producer.send_messages('my-topic', b'some message') - producer.send_messages('my-topic', b'this method', b'is variadic') + # Asynchronous by default + future = producer.send('my-topic', b'raw_bytes') - # Send unicode message - producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8')) - - # To wait for acknowledgements - # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to - # a local log before sending response - # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed - # by all in sync replicas before sending a response - producer = SimpleProducer(client, - async=False, - req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, - ack_timeout=2000, - sync_fail_on_error=False) - - responses = producer.send_messages('my-topic', b'another message') - for r in responses: - logging.info(r.offset) - - -KeyedProducer -============= - -.. code:: python + # Block for 'synchronous' sends + try: + record_metadata = future.get(timeout=10) + except KafkaError: + # Decide what to do if produce request failed... + log.exception() + pass - from kafka import ( - SimpleClient, KeyedProducer, - Murmur2Partitioner, RoundRobinPartitioner) + # Successful result returns assigned partition and offset + print (record_metadata.topic) + print (record_metadata.partition) + print (record_metadata.offset) - kafka = SimpleClient('localhost:9092') + # produce keyed messages to enable hashed partitioning + producer.send('my-topic', key=b'foo', value=b'bar') - # HashedPartitioner is default (currently uses python hash()) - producer = KeyedProducer(kafka) - producer.send_messages(b'my-topic', b'key1', b'some message') - producer.send_messages(b'my-topic', b'key2', b'this methode') + # encode objects via msgpack + producer = KafkaProducer(value_serializer=msgpack.dumps) + producer.send('msgpack-topic', {'key': 'value'}) - # Murmur2Partitioner attempts to mirror the java client hashing - producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner) + # produce json messages + producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii')) + producer.send('json-topic', {'key': 'value'}) - # Or just produce round-robin (or just use SimpleProducer) - producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner) + # configure multiple retries + producer = KafkaProducer(retries=5) |