diff options
author | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-04-21 19:44:27 +0300 |
---|---|---|
committer | Viktor Shlapakov <vshlapakov@gmail.com> | 2015-06-03 11:22:48 +0300 |
commit | 948e046b5443e0f38f6062e13153b57d29915a68 (patch) | |
tree | 328929235e06519a54453e44aa057938f6e0aa56 /kafka | |
parent | f41e5f3e4befda52a20f072f85b807d77361e64d (diff) | |
download | kafka-python-948e046b5443e0f38f6062e13153b57d29915a68.tar.gz |
Fix small issues with names/tests
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/common.py | 4 | ||||
-rw-r--r-- | kafka/producer/base.py | 16 |
2 files changed, 11 insertions, 9 deletions
diff --git a/kafka/common.py b/kafka/common.py index 50f8a77..0e769e4 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -217,6 +217,10 @@ class KafkaConfigurationError(KafkaError): pass +class AsyncProducerQueueFull(KafkaError): + pass + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0e005c5..3f0431c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import logging import time try: - from queue import Empty, Queue + from queue import Empty, Full, Queue except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Full, Queue from collections import defaultdict from threading import Thread, Event @@ -16,7 +16,8 @@ import six from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError + UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -187,11 +188,8 @@ class Producer(object): self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") # Messages are sent through this queue - self.queue = Queue(async_queue_maxsize) + self.queue = Queue(async_queue_maxsize) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -258,8 +256,8 @@ class Producer(object): item = (TopicAndPartition(topic, partition), m, key) self.queue.put_nowait(item) except Full: - raise BatchQueueOverfilledError( - 'Producer batch send queue overfilled. ' + raise AsyncProducerQueueFull( + 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] else: |