summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/common.py4
-rw-r--r--kafka/producer/base.py16
-rw-r--r--test/test_producer.py12
3 files changed, 16 insertions, 16 deletions
diff --git a/kafka/common.py b/kafka/common.py
index 50f8a77..0e769e4 100644
--- a/kafka/common.py
+++ b/kafka/common.py
@@ -217,6 +217,10 @@ class KafkaConfigurationError(KafkaError):
pass
+class AsyncProducerQueueFull(KafkaError):
+ pass
+
+
def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 0e005c5..3f0431c 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -5,9 +5,9 @@ import logging
import time
try:
- from queue import Empty, Queue
+ from queue import Empty, Full, Queue
except ImportError:
- from Queue import Empty, Queue
+ from Queue import Empty, Full, Queue
from collections import defaultdict
from threading import Thread, Event
@@ -16,7 +16,8 @@ import six
from kafka.common import (
ProduceRequest, TopicAndPartition, RetryOptions,
- UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError
+ UnsupportedCodecError, FailedPayloadsError,
+ RequestTimedOutError, AsyncProducerQueueFull
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
@@ -187,11 +188,8 @@ class Producer(object):
self.codec = codec
if self.async:
- log.warning("async producer does not guarantee message delivery!")
- log.warning("Current implementation does not retry Failed messages")
- log.warning("Use at your own risk! (or help improve with a PR!)")
# Messages are sent through this queue
- self.queue = Queue(async_queue_maxsize)
+ self.queue = Queue(async_queue_maxsize)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
@@ -258,8 +256,8 @@ class Producer(object):
item = (TopicAndPartition(topic, partition), m, key)
self.queue.put_nowait(item)
except Full:
- raise BatchQueueOverfilledError(
- 'Producer batch send queue overfilled. '
+ raise AsyncProducerQueueFull(
+ 'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
diff --git a/test/test_producer.py b/test/test_producer.py
index 627178d..de012b9 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -7,7 +7,7 @@ from mock import MagicMock, patch
from . import unittest
from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions
-from kafka.common import BatchQueueOverfilledError
+from kafka.common import AsyncProducerQueueFull
from kafka.producer.base import Producer
from kafka.producer.base import _send_upstream
from kafka.protocol import CODEC_NONE
@@ -52,8 +52,7 @@ class TestKafkaProducer(unittest.TestCase):
producer.send_messages(topic, b'hi')
assert client.send_produce_request.called
- @patch('kafka.producer.base.Process')
- def test_producer_async_queue_overfilled_batch_send(self, process_mock):
+ def test_producer_async_queue_overfilled_batch_send(self):
queue_size = 2
producer = Producer(MagicMock(), batch_send=True,
async_queue_maxsize=queue_size)
@@ -62,12 +61,11 @@ class TestKafkaProducer(unittest.TestCase):
partition = 0
message = b'test-message'
- with self.assertRaises(BatchQueueOverfilledError):
+ with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)
- @patch('kafka.producer.base.Process')
- def test_producer_async_queue_overfilled(self, process_mock):
+ def test_producer_async_queue_overfilled(self):
queue_size = 2
producer = Producer(MagicMock(), async=True,
async_queue_maxsize=queue_size)
@@ -76,7 +74,7 @@ class TestKafkaProducer(unittest.TestCase):
partition = 0
message = b'test-message'
- with self.assertRaises(BatchQueueOverfilledError):
+ with self.assertRaises(AsyncProducerQueueFull):
message_list = [message] * (queue_size + 1)
producer.send_messages(topic, partition, *message_list)