diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-05 14:12:14 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-06 19:27:56 -0700 |
commit | aa217e05448b4eced017b5ecdcb020a4411f863f (patch) | |
tree | 0d3a32aed0b2773643af5c7189c63b44a53dfd68 | |
parent | 0f1579b047fc63c09596897cc1c83730bd0ddb94 (diff) | |
download | kafka-python-aa217e05448b4eced017b5ecdcb020a4411f863f.tar.gz |
Deprecate async producer batch_send kwarg -- use 'async' instead
-rw-r--r-- | docs/usage.rst | 2 | ||||
-rw-r--r-- | kafka/producer/base.py | 62 | ||||
-rw-r--r-- | kafka/producer/keyed.py | 4 | ||||
-rw-r--r-- | kafka/producer/simple.py | 4 | ||||
-rw-r--r-- | test/test_producer.py | 17 | ||||
-rw-r--r-- | test/test_producer_integration.py | 4 |
6 files changed, 47 insertions, 46 deletions
diff --git a/docs/usage.rst b/docs/usage.rst index 150d121..cdacfdc 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -47,7 +47,7 @@ SimpleProducer # Notes: # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup - producer = SimpleProducer(kafka, batch_send=True, + producer = SimpleProducer(kafka, async=True, batch_send_every_n=20, batch_send_every_t=60) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index cd14ab6..9f4942b 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -166,32 +166,54 @@ class Producer(object): Base class to be used by producers Arguments: - client: The Kafka client instance to use - async: If set to true, the messages are sent asynchronously via another - thread (process). We will not wait for a response to these - WARNING!!! current implementation of async producer does not - guarantee message delivery. Use at your own risk! Or help us - improve with a PR! - req_acks: A value indicating the acknowledgements that the server must - receive before responding to the request - ack_timeout: Value (in milliseconds) indicating a timeout for waiting - for an acknowledgement - batch_send: If True, messages are send in batches - batch_send_every_n: If set, messages are send in batches of this size - batch_send_every_t: If set, messages are send after this timeout + client (KafkaClient): instance to use for broker communications. + codec (kafka.protocol.ALL_CODECS): compression codec to use. + req_acks (int, optional): A value indicating the acknowledgements that + the server must receive before responding to the request, + defaults to 1 (local ack). + ack_timeout (int, optional): millisecond timeout to wait for the + configured req_acks, defaults to 1000. + async (bool, optional): send message using a background thread, + defaults to False. + batch_send_every_n (int, optional): If async is True, messages are + sent in batches of this size, defaults to 20. + batch_send_every_t (int or float, optional): If async is True, + messages are sent immediately after this timeout in seconds, even + if there are fewer than batch_send_every_n, defaults to 20. + async_retry_limit (int, optional): number of retries for failed messages + or None for unlimited, defaults to None / unlimited. + async_retry_backoff_ms (int, optional): milliseconds to backoff on + failed messages, defaults to 100. + async_retry_on_timeouts (bool, optional): whether to retry on + RequestTimeoutError, defaults to True. + async_queue_maxsize (int, optional): limit to the size of the + internal message queue in number of messages (not size), defaults + to 0 (no limit). + async_queue_put_timeout (int or float, optional): timeout seconds + for queue.put in send_messages for async producers -- will only + apply if async_queue_maxsize > 0 and the queue is Full, + defaults to 0 (fail immediately on full queue). + async_log_messages_on_error (bool, optional): set to False and the + async producer will only log hash() contents on failed produce + requests, defaults to True (log full messages). Hash logging + will not allow you to identify the specific message that failed, + but it will allow you to match failures with retries. + + Deprecated Arguments: + batch_send (bool, optional): If True, messages are sent by a background + thread in batches, defaults to False. Deprecated, use 'async' """ - ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed - DEFAULT_ACK_TIMEOUT = 1000 - def __init__(self, client, async=False, + def __init__(self, client, req_acks=ACK_AFTER_LOCAL_WRITE, ack_timeout=DEFAULT_ACK_TIMEOUT, codec=None, - batch_send=False, + async=False, + batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -201,14 +223,10 @@ class Producer(object): async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR): - if batch_send: - async = True + if async: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 - else: - batch_send_every_n = 1 - batch_send_every_t = 3600 self.client = client self.async = async diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 6bb2285..2de4dcc 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -49,8 +49,8 @@ class KeyedProducer(Producer): self.partitioner_class = partitioner self.partitioners = {} - super(KeyedProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(KeyedProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, batch_send_every_t, async_retry_limit, diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 78cc21c..280a02e 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -54,8 +54,8 @@ class SimpleProducer(Producer): async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start - super(SimpleProducer, self).__init__(client, async, req_acks, - ack_timeout, codec, batch_send, + super(SimpleProducer, self).__init__(client, req_acks, ack_timeout, + codec, async, batch_send, batch_send_every_n, batch_send_every_t, async_retry_limit, diff --git a/test/test_producer.py b/test/test_producer.py index 85a5a2e..c12af02 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -57,23 +57,6 @@ class TestKafkaProducer(unittest.TestCase): assert client.send_produce_request.called @patch('kafka.producer.base._send_upstream') - def test_producer_async_queue_overfilled_batch_send(self, mock): - queue_size = 2 - producer = Producer(MagicMock(), batch_send=True, - async_queue_maxsize=queue_size) - - topic = b'test-topic' - partition = 0 - message = b'test-message' - - with self.assertRaises(AsyncProducerQueueFull): - message_list = [message] * (queue_size + 1) - producer.send_messages(topic, partition, *message_list) - self.assertEqual(producer.queue.qsize(), queue_size) - for _ in xrange(producer.queue.qsize()): - producer.queue.get() - - @patch('kafka.producer.base._send_upstream') def test_producer_async_queue_overfilled(self, mock): queue_size = 2 producer = Producer(MagicMock(), async=True, diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 099b975..3c414e1 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -221,7 +221,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): batch_interval = 5 producer = SimpleProducer( self.client, - batch_send=True, + async=True, batch_send_every_n=batch_messages, batch_send_every_t=batch_interval, random_start=False) @@ -287,7 +287,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): batch_interval = 5 producer = SimpleProducer( self.client, - batch_send=True, + async=True, batch_send_every_n=100, batch_send_every_t=batch_interval, random_start=False) |