summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
commit077dc4742ffa82584946379790424faf4c6ba47f (patch)
treebd14706a8dfc429f6bf211bac02ad21af967c6ce /test
parent48e96822b3ec4f897438a2d1cdb735f51648cb48 (diff)
parent85c0dd2579eb6aa0b9492d9082d0f4cf4d8ea39d (diff)
downloadkafka-python-077dc4742ffa82584946379790424faf4c6ba47f.tar.gz
Merge pull request #515 from dpkp/kafka_producer
KafkaProducer
Diffstat (limited to 'test')
-rw-r--r--test/conftest.py33
-rw-r--r--test/fixtures.py3
-rw-r--r--test/test_consumer_group.py30
-rw-r--r--test/test_partitioner.py64
-rw-r--r--test/test_producer.py291
-rw-r--r--test/test_producer_legacy.py257
6 files changed, 369 insertions, 309 deletions
diff --git a/test/conftest.py b/test/conftest.py
new file mode 100644
index 0000000..f3a8947
--- /dev/null
+++ b/test/conftest.py
@@ -0,0 +1,33 @@
+import os
+
+import pytest
+
+from test.fixtures import KafkaFixture, ZookeeperFixture
+
+
+@pytest.fixture(scope="module")
+def version():
+ if 'KAFKA_VERSION' not in os.environ:
+ return ()
+ return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
+
+
+@pytest.fixture(scope="module")
+def zookeeper(version, request):
+ assert version
+ zk = ZookeeperFixture.instance()
+ def fin():
+ zk.close()
+ request.addfinalizer(fin)
+ return zk
+
+
+@pytest.fixture(scope="module")
+def kafka_broker(version, zookeeper, request):
+ assert version
+ k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
+ partitions=4)
+ def fin():
+ k.close()
+ request.addfinalizer(fin)
+ return k
diff --git a/test/fixtures.py b/test/fixtures.py
index 91a67c1..2613a41 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -5,10 +5,11 @@ import shutil
import subprocess
import tempfile
import time
-from six.moves import urllib
import uuid
+from six.moves import urllib
from six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
+
from test.service import ExternalService, SpawnedService
from test.testutil import get_open_port
diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py
index 035d65a..f153d2d 100644
--- a/test/test_consumer_group.py
+++ b/test/test_consumer_group.py
@@ -12,38 +12,10 @@ from kafka.common import TopicPartition
from kafka.conn import BrokerConnection, ConnectionStates
from kafka.consumer.group import KafkaConsumer
-from test.fixtures import KafkaFixture, ZookeeperFixture
+from test.conftest import version
from test.testutil import random_string
-@pytest.fixture(scope="module")
-def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-
-
-@pytest.fixture(scope="module")
-def zookeeper(version, request):
- assert version
- zk = ZookeeperFixture.instance()
- def fin():
- zk.close()
- request.addfinalizer(fin)
- return zk
-
-
-@pytest.fixture(scope="module")
-def kafka_broker(version, zookeeper, request):
- assert version
- k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
- partitions=4)
- def fin():
- k.close()
- request.addfinalizer(fin)
- return k
-
-
@pytest.fixture
def simple_client(kafka_broker):
connect_str = 'localhost:' + str(kafka_broker.port)
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index 67cd83b..52b6b81 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -1,23 +1,43 @@
+import pytest
import six
-from . import unittest
-
-from kafka.partitioner import (Murmur2Partitioner)
-
-class TestMurmurPartitioner(unittest.TestCase):
- def test_hash_bytes(self):
- p = Murmur2Partitioner(range(1000))
- self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
-
- def test_hash_encoding(self):
- p = Murmur2Partitioner(range(1000))
- self.assertEqual(p.partition('test'), p.partition(u'test'))
-
- def test_murmur2_java_compatibility(self):
- p = Murmur2Partitioner(range(1000))
- # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
- self.assertEqual(681, p.partition(b''))
- self.assertEqual(524, p.partition(b'a'))
- self.assertEqual(434, p.partition(b'ab'))
- self.assertEqual(107, p.partition(b'abc'))
- self.assertEqual(566, p.partition(b'123456789'))
- self.assertEqual(742, p.partition(b'\x00 '))
+
+from kafka.partitioner import Murmur2Partitioner
+from kafka.partitioner.default import DefaultPartitioner
+
+
+def test_default_partitioner():
+ partitioner = DefaultPartitioner()
+ all_partitions = list(range(100))
+ available = all_partitions
+ # partitioner should return the same partition for the same key
+ p1 = partitioner(b'foo', all_partitions, available)
+ p2 = partitioner(b'foo', all_partitions, available)
+ assert p1 == p2
+ assert p1 in all_partitions
+
+ # when key is None, choose one of available partitions
+ assert partitioner(None, all_partitions, [123]) == 123
+
+ # with fallback to all_partitions
+ assert partitioner(None, all_partitions, []) in all_partitions
+
+
+def test_hash_bytes():
+ p = Murmur2Partitioner(range(1000))
+ assert p.partition(bytearray(b'test')) == p.partition(b'test')
+
+
+def test_hash_encoding():
+ p = Murmur2Partitioner(range(1000))
+ assert p.partition('test') == p.partition(u'test')
+
+
+def test_murmur2_java_compatibility():
+ p = Murmur2Partitioner(range(1000))
+ # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
+ assert p.partition(b'') == 681
+ assert p.partition(b'a') == 524
+ assert p.partition(b'ab') == 434
+ assert p.partition(b'abc') == 107
+ assert p.partition(b'123456789') == 566
+ assert p.partition(b'\x00 ') == 742
diff --git a/test/test_producer.py b/test/test_producer.py
index 850cb80..b84feb4 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,257 +1,34 @@
-# -*- coding: utf-8 -*-
-
-import collections
-import logging
-import threading
-import time
-
-from mock import MagicMock, patch
-from . import unittest
-
-from kafka import SimpleClient, SimpleProducer, KeyedProducer
-from kafka.common import (
- AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
- ProduceResponsePayload, RetryOptions, TopicPartition
-)
-from kafka.producer.base import Producer, _send_upstream
-from kafka.protocol import CODEC_NONE
-
-from six.moves import queue, xrange
-
-
-class TestKafkaProducer(unittest.TestCase):
- def test_producer_message_types(self):
-
- producer = Producer(MagicMock())
- topic = b"test-topic"
- partition = 0
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'}, None,)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, partition, m)
-
- good_data_types = (b'a string!',)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, partition, m)
-
- def test_keyedproducer_message_types(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
- producer = KeyedProducer(client)
- topic = b"test-topic"
- key = b"testkey"
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'},)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, key, m)
-
- good_data_types = (b'a string!', None,)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, key, m)
-
- def test_topic_message_types(self):
- client = MagicMock()
-
- def partitions(topic):
- return [0, 1]
-
- client.get_partition_ids_for_topic = partitions
-
- producer = SimpleProducer(client, random_start=False)
- topic = b"test-topic"
- producer.send_messages(topic, b'hi')
- assert client.send_produce_request.called
-
- @patch('kafka.producer.base._send_upstream')
- def test_producer_async_queue_overfilled(self, mock):
- queue_size = 2
- producer = Producer(MagicMock(), async=True,
- async_queue_maxsize=queue_size)
-
- topic = b'test-topic'
- partition = 0
- message = b'test-message'
-
- with self.assertRaises(AsyncProducerQueueFull):
- message_list = [message] * (queue_size + 1)
- producer.send_messages(topic, partition, *message_list)
- self.assertEqual(producer.queue.qsize(), queue_size)
- for _ in xrange(producer.queue.qsize()):
- producer.queue.get()
-
- def test_producer_sync_fail_on_error(self):
- error = FailedPayloadsError('failure')
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, 'ensure_topic_exists'):
- with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
-
- client = SimpleClient(MagicMock())
- producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
-
- # This should not raise
- (response,) = producer.send_messages('foobar', b'test message')
- self.assertEqual(response, error)
-
- producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages('foobar', b'test message')
-
- def test_cleanup_is_not_called_on_stopped_producer(self):
- producer = Producer(MagicMock(), async=True)
- producer.stopped = True
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 0)
-
- def test_cleanup_is_called_on_running_producer(self):
- producer = Producer(MagicMock(), async=True)
- producer.stopped = False
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 1)
-
-
-class TestKafkaProducerSendUpstream(unittest.TestCase):
-
- def setUp(self):
- self.client = MagicMock()
- self.queue = queue.Queue()
-
- def _run_process(self, retries_limit=3, sleep_timeout=1):
- # run _send_upstream process with the queue
- stop_event = threading.Event()
- retry_options = RetryOptions(limit=retries_limit,
- backoff_ms=50,
- retry_on_timeouts=False)
- self.thread = threading.Thread(
- target=_send_upstream,
- args=(self.queue, self.client, CODEC_NONE,
- 0.3, # batch time (seconds)
- 3, # batch length
- Producer.ACK_AFTER_LOCAL_WRITE,
- Producer.DEFAULT_ACK_TIMEOUT,
- retry_options,
- stop_event))
- self.thread.daemon = True
- self.thread.start()
- time.sleep(sleep_timeout)
- stop_event.set()
-
- def test_wo_retries(self):
-
- # lets create a queue and add 10 messages for 1 partition
- for i in range(10):
- self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
-
- self._run_process()
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 4 non-void cals:
- # 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 4)
-
- def test_first_send_failed(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [FailedPayloadsError(req) for req in reqs]
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # plus 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def test_with_limited_retries(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
-
- def send_side_effect(reqs, *args, **kwargs):
- return [FailedPayloadsError(req) for req in reqs]
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(3, 3)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 16 non-void calls:
- # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
- self.assertEqual(self.client.send_produce_request.call_count, 16)
-
- def test_async_producer_not_leader(self):
-
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [ProduceResponsePayload(req.topic, req.partition,
- NotLeaderForPartitionError.errno, -1)
- for req in reqs]
-
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def tearDown(self):
- for _ in xrange(self.queue.qsize()):
- self.queue.get()
+import pytest
+
+from kafka import KafkaConsumer, KafkaProducer
+from test.conftest import version
+from test.testutil import random_string
+
+
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_end_to_end(kafka_broker):
+ connect_str = 'localhost:' + str(kafka_broker.port)
+ producer = KafkaProducer(bootstrap_servers=connect_str,
+ max_block_ms=10000,
+ value_serializer=str.encode)
+ consumer = KafkaConsumer(bootstrap_servers=connect_str,
+ consumer_timeout_ms=10000,
+ auto_offset_reset='earliest',
+ value_deserializer=bytes.decode)
+
+ topic = random_string(5)
+
+ for i in range(1000):
+ producer.send(topic, 'msg %d' % i)
+ producer.flush()
+ producer.close()
+
+ consumer.subscribe([topic])
+ msgs = set()
+ for i in range(1000):
+ try:
+ msgs.add(next(consumer).value)
+ except StopIteration:
+ break
+
+ assert msgs == set(['msg %d' % i for i in range(1000)])
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
new file mode 100644
index 0000000..850cb80
--- /dev/null
+++ b/test/test_producer_legacy.py
@@ -0,0 +1,257 @@
+# -*- coding: utf-8 -*-
+
+import collections
+import logging
+import threading
+import time
+
+from mock import MagicMock, patch
+from . import unittest
+
+from kafka import SimpleClient, SimpleProducer, KeyedProducer
+from kafka.common import (
+ AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
+ ProduceResponsePayload, RetryOptions, TopicPartition
+)
+from kafka.producer.base import Producer, _send_upstream
+from kafka.protocol import CODEC_NONE
+
+from six.moves import queue, xrange
+
+
+class TestKafkaProducer(unittest.TestCase):
+ def test_producer_message_types(self):
+
+ producer = Producer(MagicMock())
+ topic = b"test-topic"
+ partition = 0
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'}, None,)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, partition, m)
+
+ good_data_types = (b'a string!',)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, partition, m)
+
+ def test_keyedproducer_message_types(self):
+ client = MagicMock()
+ client.get_partition_ids_for_topic.return_value = [0, 1]
+ producer = KeyedProducer(client)
+ topic = b"test-topic"
+ key = b"testkey"
+
+ bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
+ ('a', 'tuple'), {'a': 'dict'},)
+ for m in bad_data_types:
+ with self.assertRaises(TypeError):
+ logging.debug("attempting to send message of type %s", type(m))
+ producer.send_messages(topic, key, m)
+
+ good_data_types = (b'a string!', None,)
+ for m in good_data_types:
+ # This should not raise an exception
+ producer.send_messages(topic, key, m)
+
+ def test_topic_message_types(self):
+ client = MagicMock()
+
+ def partitions(topic):
+ return [0, 1]
+
+ client.get_partition_ids_for_topic = partitions
+
+ producer = SimpleProducer(client, random_start=False)
+ topic = b"test-topic"
+ producer.send_messages(topic, b'hi')
+ assert client.send_produce_request.called
+
+ @patch('kafka.producer.base._send_upstream')
+ def test_producer_async_queue_overfilled(self, mock):
+ queue_size = 2
+ producer = Producer(MagicMock(), async=True,
+ async_queue_maxsize=queue_size)
+
+ topic = b'test-topic'
+ partition = 0
+ message = b'test-message'
+
+ with self.assertRaises(AsyncProducerQueueFull):
+ message_list = [message] * (queue_size + 1)
+ producer.send_messages(topic, partition, *message_list)
+ self.assertEqual(producer.queue.qsize(), queue_size)
+ for _ in xrange(producer.queue.qsize()):
+ producer.queue.get()
+
+ def test_producer_sync_fail_on_error(self):
+ error = FailedPayloadsError('failure')
+ with patch.object(SimpleClient, 'load_metadata_for_topics'):
+ with patch.object(SimpleClient, 'ensure_topic_exists'):
+ with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
+ with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
+
+ client = SimpleClient(MagicMock())
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
+
+ # This should not raise
+ (response,) = producer.send_messages('foobar', b'test message')
+ self.assertEqual(response, error)
+
+ producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
+ with self.assertRaises(FailedPayloadsError):
+ producer.send_messages('foobar', b'test message')
+
+ def test_cleanup_is_not_called_on_stopped_producer(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = True
+ with patch.object(producer, 'stop') as mocked_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(mocked_stop.call_count, 0)
+
+ def test_cleanup_is_called_on_running_producer(self):
+ producer = Producer(MagicMock(), async=True)
+ producer.stopped = False
+ with patch.object(producer, 'stop') as mocked_stop:
+ producer._cleanup_func(producer)
+ self.assertEqual(mocked_stop.call_count, 1)
+
+
+class TestKafkaProducerSendUpstream(unittest.TestCase):
+
+ def setUp(self):
+ self.client = MagicMock()
+ self.queue = queue.Queue()
+
+ def _run_process(self, retries_limit=3, sleep_timeout=1):
+ # run _send_upstream process with the queue
+ stop_event = threading.Event()
+ retry_options = RetryOptions(limit=retries_limit,
+ backoff_ms=50,
+ retry_on_timeouts=False)
+ self.thread = threading.Thread(
+ target=_send_upstream,
+ args=(self.queue, self.client, CODEC_NONE,
+ 0.3, # batch time (seconds)
+ 3, # batch length
+ Producer.ACK_AFTER_LOCAL_WRITE,
+ Producer.DEFAULT_ACK_TIMEOUT,
+ retry_options,
+ stop_event))
+ self.thread.daemon = True
+ self.thread.start()
+ time.sleep(sleep_timeout)
+ stop_event.set()
+
+ def test_wo_retries(self):
+
+ # lets create a queue and add 10 messages for 1 partition
+ for i in range(10):
+ self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
+
+ self._run_process()
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 4 non-void cals:
+ # 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.client.send_produce_request.call_count, 4)
+
+ def test_first_send_failed(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [FailedPayloadsError(req) for req in reqs]
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # plus 3 batches of 3 msgs each + 1 batch of 1 message
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
+ def test_with_limited_retries(self):
+
+ # lets create a queue and add 10 messages for 10 different partitions
+ # to show how retries should work ideally
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
+
+ def send_side_effect(reqs, *args, **kwargs):
+ return [FailedPayloadsError(req) for req in reqs]
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(3, 3)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 16 non-void calls:
+ # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
+ # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
+ self.assertEqual(self.client.send_produce_request.call_count, 16)
+
+ def test_async_producer_not_leader(self):
+
+ for i in range(10):
+ self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
+
+ # Mock offsets counter for closure
+ offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
+ self.client.is_first_time = True
+ def send_side_effect(reqs, *args, **kwargs):
+ if self.client.is_first_time:
+ self.client.is_first_time = False
+ return [ProduceResponsePayload(req.topic, req.partition,
+ NotLeaderForPartitionError.errno, -1)
+ for req in reqs]
+
+ responses = []
+ for req in reqs:
+ offset = offsets[req.topic][req.partition]
+ offsets[req.topic][req.partition] += len(req.messages)
+ responses.append(
+ ProduceResponsePayload(req.topic, req.partition, 0, offset)
+ )
+ return responses
+
+ self.client.send_produce_request.side_effect = send_side_effect
+
+ self._run_process(2)
+
+ # the queue should be void at the end of the test
+ self.assertEqual(self.queue.empty(), True)
+
+ # there should be 5 non-void calls: 1st failed batch of 3 msgs
+ # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
+ self.assertEqual(self.client.send_produce_request.call_count, 5)
+
+ def tearDown(self):
+ for _ in xrange(self.queue.qsize()):
+ self.queue.get()