diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-04-21 19:44:27 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:48 +0300 |
commit | 948e046b5443e0f38f6062e13153b57d29915a68 (patch) | |
tree | 328929235e06519a54453e44aa057938f6e0aa56 | |
parent | f41e5f3e4befda52a20f072f85b807d77361e64d (diff) | |
download | kafka-python-948e046b5443e0f38f6062e13153b57d29915a68.tar.gz |
Fix small issues with names/tests
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | kafka/producer/base.py | 16 | ||||
-rw-r--r-- | test/test_producer.py | 12 |
3 files changed, 16 insertions, 16 deletions
diff --git a/kafka/common.py b/kafka/common.py index 50f8a77..0e769e4 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -217,6 +217,10 @@ class KafkaConfigurationError(KafkaError): pass +class AsyncProducerQueueFull(KafkaError): + pass + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0e005c5..3f0431c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import logging import time try: - from queue import Empty, Queue + from queue import Empty, Full, Queue except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Full, Queue from collections import defaultdict from threading import Thread, Event @@ -16,7 +16,8 @@ import six from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError + UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -187,11 +188,8 @@ class Producer(object): self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") # Messages are sent through this queue - self.queue = Queue(async_queue_maxsize) + self.queue = Queue(async_queue_maxsize) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -258,8 +256,8 @@ class Producer(object): item = (TopicAndPartition(topic, partition), m, key) self.queue.put_nowait(item) except Full: - raise BatchQueueOverfilledError( - 'Producer batch send queue overfilled. ' + raise AsyncProducerQueueFull( + 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] else: diff --git a/test/test_producer.py b/test/test_producer.py index 627178d..de012b9 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ from mock import MagicMock, patch from . import unittest from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions -from kafka.common import BatchQueueOverfilledError +from kafka.common import AsyncProducerQueueFull from kafka.producer.base import Producer from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE @@ -52,8 +52,7 @@ class TestKafkaProducer(unittest.TestCase): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - @patch('kafka.producer.base.Process') - def test_producer_async_queue_overfilled_batch_send(self, process_mock): + def test_producer_async_queue_overfilled_batch_send(self): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) @@ -62,12 +61,11 @@ class TestKafkaProducer(unittest.TestCase): partition = 0 message = b'test-message' - with self.assertRaises(BatchQueueOverfilledError): + with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) - @patch('kafka.producer.base.Process') - def test_producer_async_queue_overfilled(self, process_mock): + def test_producer_async_queue_overfilled(self): queue_size = 2 producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) @@ -76,7 +74,7 @@ class TestKafkaProducer(unittest.TestCase): partition = 0 message = b'test-message' - with self.assertRaises(BatchQueueOverfilledError): + with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) |