summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/__init__.py7
-rw-r--r--test/conftest.py9
-rw-r--r--test/fixtures.py7
-rw-r--r--test/test_client.py405
-rw-r--r--test/test_client_integration.py95
-rw-r--r--test/test_consumer.py135
-rw-r--r--test/test_consumer_integration.py498
-rw-r--r--test/test_context.py117
-rw-r--r--test/test_failover_integration.py240
-rw-r--r--test/test_package.py18
-rw-r--r--test/test_partitioner.py39
-rw-r--r--test/test_producer_integration.py529
-rw-r--r--test/test_producer_legacy.py257
-rw-r--r--test/test_protocol_legacy.py848
-rw-r--r--test/test_util.py85
-rw-r--r--test/testutil.py105
16 files changed, 19 insertions, 3375 deletions
diff --git a/test/__init__.py b/test/__init__.py
index 3d2ba3d..71f667d 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -1,12 +1,5 @@
from __future__ import absolute_import
-import sys
-
-if sys.version_info < (2, 7):
- import unittest2 as unittest # pylint: disable=import-error
-else:
- import unittest
-
# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
diff --git a/test/conftest.py b/test/conftest.py
index bbe4048..3fa0262 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -43,15 +43,6 @@ def kafka_broker_factory(zookeeper):
@pytest.fixture
-def simple_client(kafka_broker, request, topic):
- """Return a SimpleClient fixture"""
- client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
- client.ensure_topic_exists(topic)
- yield client
- client.close()
-
-
-@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
diff --git a/test/fixtures.py b/test/fixtures.py
index 68572b5..557fca6 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -13,8 +13,7 @@ import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
-from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
-from kafka.client_async import KafkaClient
+from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
@@ -524,7 +523,3 @@ class KafkaFixture(Fixture):
for x in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
yield KafkaProducer(**params)
-
- def get_simple_client(self, **params):
- params.setdefault('client_id', 'simple_client')
- return SimpleClient(self.bootstrap_server(), **params)
diff --git a/test/test_client.py b/test/test_client.py
deleted file mode 100644
index 1c68978..0000000
--- a/test/test_client.py
+++ /dev/null
@@ -1,405 +0,0 @@
-import socket
-
-from mock import ANY, MagicMock, patch
-from operator import itemgetter
-from kafka.vendor import six
-from . import unittest
-
-from kafka import SimpleClient
-from kafka.errors import (
- KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
- UnknownTopicOrPartitionError, FailedPayloadsError)
-from kafka.future import Future
-from kafka.protocol import KafkaProtocol, create_message
-from kafka.protocol.metadata import MetadataResponse
-from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
-
-
-NO_ERROR = 0
-UNKNOWN_TOPIC_OR_PARTITION = 3
-NO_LEADER = 5
-
-
-def mock_conn(conn, success=True):
- mocked = MagicMock()
- mocked.connected.return_value = True
- if success:
- mocked.send.return_value = Future().success(True)
- else:
- mocked.send.return_value = Future().failure(Exception())
- conn.return_value = mocked
- conn.recv.return_value = []
-
-
-class TestSimpleClient(unittest.TestCase):
- def test_init_with_list(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- def test_init_with_csv(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- def test_init_with_unicode_csv(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- @patch.object(SimpleClient, '_get_conn')
- @patch.object(SimpleClient, 'load_metadata_for_topics')
- def test_send_broker_unaware_request_fail(self, load_metadata, conn):
- mocked_conns = {
- ('kafka01', 9092): MagicMock(),
- ('kafka02', 9092): MagicMock()
- }
- for val in mocked_conns.values():
- mock_conn(val, success=False)
-
- def mock_get_conn(host, port, afi):
- return mocked_conns[(host, port)]
- conn.side_effect = mock_get_conn
-
- client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092'])
-
- req = KafkaProtocol.encode_metadata_request()
- with self.assertRaises(KafkaUnavailableError):
- client._send_broker_unaware_request(payloads=['fake request'],
- encoder_fn=MagicMock(return_value='fake encoded message'),
- decoder_fn=lambda x: x)
-
- for key, conn in six.iteritems(mocked_conns):
- conn.send.assert_called_with('fake encoded message')
-
- def test_send_broker_unaware_request(self):
- mocked_conns = {
- ('kafka01', 9092): MagicMock(),
- ('kafka02', 9092): MagicMock(),
- ('kafka03', 9092): MagicMock()
- }
- # inject BrokerConnection side effects
- mock_conn(mocked_conns[('kafka01', 9092)], success=False)
- mock_conn(mocked_conns[('kafka03', 9092)], success=False)
- future = Future()
- mocked_conns[('kafka02', 9092)].send.return_value = future
- mocked_conns[('kafka02', 9092)].recv.return_value = [('valid response', future)]
-
- def mock_get_conn(host, port, afi):
- return mocked_conns[(host, port)]
-
- # patch to avoid making requests before we want it
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn):
-
- client = SimpleClient(hosts='kafka01:9092,kafka02:9092')
- resp = client._send_broker_unaware_request(payloads=['fake request'],
- encoder_fn=MagicMock(),
- decoder_fn=lambda x: x)
-
- self.assertEqual('valid response', resp)
- mocked_conns[('kafka02', 9092)].recv.assert_called_once_with()
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_load_metadata(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_1', [
- (NO_ERROR, 0, 1, [1, 2], [1, 2])
- ]),
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- (NO_LEADER, 'topic_no_partitions', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
- (NO_ERROR, 'topic_3', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1]),
- (NO_ERROR, 1, 1, [1, 0], [1, 0]),
- (NO_ERROR, 2, 0, [0, 1], [0, 1])
- ])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- # client loads metadata at init
- client = SimpleClient(hosts=['broker_1:4567'])
- self.assertDictEqual({
- TopicPartition('topic_1', 0): brokers[1],
- TopicPartition('topic_noleader', 0): None,
- TopicPartition('topic_noleader', 1): None,
- TopicPartition('topic_3', 0): brokers[0],
- TopicPartition('topic_3', 1): brokers[1],
- TopicPartition('topic_3', 2): brokers[0]},
- client.topics_to_brokers)
-
- # if we ask for metadata explicitly, it should raise errors
- with self.assertRaises(LeaderNotAvailableError):
- client.load_metadata_for_topics('topic_no_partitions')
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client.load_metadata_for_topics('topic_unknown')
-
- # This should not raise
- client.load_metadata_for_topics('topic_no_leader')
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_has_metadata_for_topic(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_still_creating', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- (NO_ERROR, 'topic_noleaders', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- # Topics with no partitions return False
- self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
- self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
-
- # Topic with partition metadata, but no leaders return True
- self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol.decode_metadata_response')
- def test_ensure_topic_exists(self, decode_metadata_response, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_still_creating', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- (NO_ERROR, 'topic_noleaders', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
-
- with self.assertRaises(KafkaTimeoutError):
- client.ensure_topic_exists('topic_still_creating', timeout=1)
-
- # This should not raise
- client.ensure_topic_exists('topic_noleaders', timeout=1)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
- "Get leader for partitions reload metadata if it is not available"
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_no_partitions', [])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- # topic metadata is loaded but empty
- self.assertDictEqual({}, client.topics_to_brokers)
-
- topics = [
- (NO_ERROR, 'topic_one_partition', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1])
- ])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- # calling _get_leader_for_partition (from any broker aware request)
- # will try loading metadata again for the same topic
- leader = client._get_leader_for_partition('topic_one_partition', 0)
-
- self.assertEqual(brokers[0], leader)
- self.assertDictEqual({
- TopicPartition('topic_one_partition', 0): brokers[0]},
- client.topics_to_brokers)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_for_unassigned_partitions(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_no_partitions', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- self.assertDictEqual({}, client.topics_to_brokers)
-
- with self.assertRaises(LeaderNotAvailableError):
- client._get_leader_for_partition('topic_no_partitions', 0)
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client._get_leader_for_partition('topic_unknown', 0)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_exceptions_when_noleader(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
- self.assertDictEqual(
- {
- TopicPartition('topic_noleader', 0): None,
- TopicPartition('topic_noleader', 1): None
- },
- client.topics_to_brokers)
-
- # No leader partitions -- raise LeaderNotAvailableError
- with self.assertRaises(LeaderNotAvailableError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
- with self.assertRaises(LeaderNotAvailableError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
-
- # Unknown partitions -- raise UnknownTopicOrPartitionError
- with self.assertRaises(UnknownTopicOrPartitionError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1]),
- (NO_ERROR, 1, 1, [1, 0], [1, 0])
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
- self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
- self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
-
- @patch.object(SimpleClient, '_get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_send_produce_request_raises_when_noleader(self, protocol, conn):
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- requests = [ProduceRequestPayload(
- "topic_noleader", 0,
- [create_message("a"), create_message("b")])]
-
- with self.assertRaises(FailedPayloadsError):
- client.send_produce_request(requests)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- requests = [ProduceRequestPayload(
- "topic_doesnt_exist", 0,
- [create_message("a"), create_message("b")])]
-
- with self.assertRaises(FailedPayloadsError):
- client.send_produce_request(requests)
-
- def test_correlation_rollover(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- big_num = 2**31 - 3
- client = SimpleClient(hosts=(), correlation_id=big_num)
- self.assertEqual(big_num + 1, client._next_id())
- self.assertEqual(big_num + 2, client._next_id())
- self.assertEqual(0, client._next_id())
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
deleted file mode 100644
index a983ce1..0000000
--- a/test/test_client_integration.py
+++ /dev/null
@@ -1,95 +0,0 @@
-import os
-
-import pytest
-
-from kafka.errors import KafkaTimeoutError
-from kafka.protocol import create_message
-from kafka.structs import (
- FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- ProduceRequestPayload)
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version
-
-
-class TestKafkaClientIntegration(KafkaIntegrationTestCase):
- @classmethod
- def setUpClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server.close()
- cls.zk.close()
-
- def test_consume_none(self):
- fetch = FetchRequestPayload(self.topic, 0, 0, 1024)
-
- fetch_resp, = self.client.send_fetch_request([fetch])
- self.assertEqual(fetch_resp.error, 0)
- self.assertEqual(fetch_resp.topic, self.topic)
- self.assertEqual(fetch_resp.partition, 0)
-
- messages = list(fetch_resp.messages)
- self.assertEqual(len(messages), 0)
-
- def test_ensure_topic_exists(self):
-
- # assume that self.topic was created by setUp
- # if so, this should succeed
- self.client.ensure_topic_exists(self.topic, timeout=1)
-
- # ensure_topic_exists should fail with KafkaTimeoutError
- with self.assertRaises(KafkaTimeoutError):
- self.client.ensure_topic_exists('this_topic_doesnt_exist', timeout=0)
-
- def test_send_produce_request_maintains_request_response_order(self):
-
- self.client.ensure_topic_exists('foo')
- self.client.ensure_topic_exists('bar')
-
- requests = [
- ProduceRequestPayload(
- 'foo', 0,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'bar', 1,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'foo', 1,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'bar', 0,
- [create_message(b'a'), create_message(b'b')]),
- ]
-
- responses = self.client.send_produce_request(requests)
- while len(responses):
- request = requests.pop()
- response = responses.pop()
- self.assertEqual(request.topic, response.topic)
- self.assertEqual(request.partition, response.partition)
-
-
- ####################
- # Offset Tests #
- ####################
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_commit_fetch_offsets(self):
- req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
- (resp,) = self.client.send_offset_commit_request('group', [req])
- self.assertEqual(resp.error, 0)
-
- req = OffsetFetchRequestPayload(self.topic, 0)
- (resp,) = self.client.send_offset_fetch_request('group', [req])
- self.assertEqual(resp.error, 0)
- self.assertEqual(resp.offset, 42)
- self.assertEqual(resp.metadata, '') # Metadata isn't stored for now
diff --git a/test/test_consumer.py b/test/test_consumer.py
index edcc2d8..436fe55 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -1,15 +1,7 @@
-import sys
-
-from mock import MagicMock, patch
-from . import unittest
import pytest
-from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
-from kafka.errors import (
- FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError)
-from kafka.structs import (
- FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
+from kafka import KafkaConsumer
+from kafka.errors import KafkaConfigurationError
class TestKafkaConsumer:
@@ -32,126 +24,3 @@ class TestKafkaConsumer:
assert sub == set(['foo'])
sub.add('fizz')
assert consumer.subscription() == set(['foo'])
-
-
-class TestMultiProcessConsumer(unittest.TestCase):
- @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
- def test_partition_list(self):
- client = MagicMock()
- partitions = (0,)
- with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
- self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
- self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
-
-
-class TestSimpleConsumer(unittest.TestCase):
- def test_non_integer_partitions(self):
- with self.assertRaises(AssertionError):
- SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
-
- def test_simple_consumer_failed_payloads(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- def failed_payloads(payload):
- return FailedPayloadsError(payload)
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
-
- # This should not raise an exception
- consumer.get_messages(5)
-
- def test_simple_consumer_leader_change(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def not_leader(request):
- return FetchResponsePayload(request.topic, request.partition,
- NotLeaderForPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
-
- # This should not raise an exception
- consumer.get_messages(20)
-
- # client should have updated metadata
- self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
- self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
-
- def test_simple_consumer_unknown_topic_partition(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def unknown_topic_partition(request):
- return FetchResponsePayload(request.topic, request.partition,
- UnknownTopicOrPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
-
- # This should not raise an exception
- with self.assertRaises(UnknownTopicOrPartitionError):
- consumer.get_messages(20)
-
- def test_simple_consumer_commit_does_not_raise(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
-
- def mock_offset_fetch_request(group, payloads, **kwargs):
- return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
-
- client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
-
- def mock_offset_commit_request(group, payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_commit_request.side_effect = mock_offset_commit_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock internal commit check
- consumer.count_since_commit = 10
-
- # This should not raise an exception
- self.assertFalse(consumer.commit(partitions=[0, 1]))
-
- def test_simple_consumer_reset_partition_offset(self):
- client = MagicMock()
-
- def mock_offset_request(payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_request.side_effect = mock_offset_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # This should not raise an exception
- self.assertEqual(consumer.reset_partition_offset(0), None)
-
- @staticmethod
- def fail_requests_factory(error_factory):
- # Mock so that only the first request gets a valid response
- def fail_requests(payloads, **kwargs):
- responses = [
- FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
- [OffsetAndMessage(
- payloads[0].offset + i,
- "msg %d" % (payloads[0].offset + i))
- for i in range(10)]),
- ]
- for failure in payloads[1:]:
- responses.append(error_factory(failure))
- return responses
- return fail_requests
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index d6fd41c..6e6bc94 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,29 +1,17 @@
import logging
-import os
import time
from mock import patch
import pytest
from kafka.vendor.six.moves import range
-from . import unittest
-from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
- create_gzip_message, KafkaProducer
-)
import kafka.codec
-from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
- ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError, UnsupportedCodecError
-)
-from kafka.protocol.message import PartialMessage
-from kafka.structs import (
- ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+ KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
)
+from kafka.structs import TopicPartition, OffsetAndTimestamp
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string
+from test.testutil import Timer, assert_message_count, env_kafka_version, random_string
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@@ -63,486 +51,6 @@ def test_kafka_consumer_unsupported_encoding(
consumer.poll(timeout_ms=2000)
-class TestConsumerIntegration(KafkaIntegrationTestCase):
- maxDiff = None
-
- @classmethod
- def setUpClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk,
- zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk,
- zk_chroot=chroot)
-
- cls.server = cls.server1 # Bootstrapping server
-
- @classmethod
- def tearDownClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server1.close()
- cls.server2.close()
- cls.zk.close()
-
- def send_messages(self, partition, messages):
- messages = [ create_message(self.msg(str(msg))) for msg in messages ]
- produce = ProduceRequestPayload(self.topic, partition, messages = messages)
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- return [ x.value for x in messages ]
-
- def send_gzip_message(self, partition, messages):
- message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
- produce = ProduceRequestPayload(self.topic, partition, messages = [message])
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- def assert_message_count(self, messages, num_messages):
- # Make sure we got them all
- self.assertEqual(len(messages), num_messages)
-
- # Make sure there are no duplicates
- self.assertEqual(len(set(messages)), num_messages)
-
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['group'] = None
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('group', None)
- kwargs.setdefault('auto_commit', False)
-
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', None)
- topic = kwargs.pop('topic', self.topic)
-
- if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
- kwargs.setdefault('iter_timeout', 0)
-
- return consumer_class(self.client, group, topic, **kwargs)
-
- def kafka_consumer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- consumer = KafkaConsumer(self.topic,
- bootstrap_servers=brokers,
- **configs)
- return consumer
-
- def kafka_producer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- producer = KafkaProducer(
- bootstrap_servers=brokers, **configs)
- return producer
-
- def test_simple_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_gzip(self):
- self.send_gzip_message(0, range(0, 100))
- self.send_gzip_message(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_smallest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(auto_offset_reset='smallest')
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to smallest we should read all 200
- # messages from beginning.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_largest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer()
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to largest we should not read any
- # messages.
- self.assert_message_count([message for message in consumer], 0)
- # Send 200 new messages to the queue
- self.send_messages(0, range(200, 300))
- self.send_messages(1, range(300, 400))
- # Since the offset is set to largest we should read all the new messages.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_no_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer(auto_offset_reset=None)
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- with self.assertRaises(OffsetOutOfRangeError):
- consumer.get_message()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_simple_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:51, 1:101})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 51, 1: 101})
-
- def test_simple_consumer__seek(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer()
-
- # Rewind 10 messages from the end
- consumer.seek(-10, 2)
- self.assert_message_count([ message for message in consumer ], 10)
-
- # Rewind 13 messages from the end
- consumer.seek(-13, 2)
- self.assert_message_count([ message for message in consumer ], 13)
-
- # Set absolute offset
- consumer.seek(100)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(100, partition=0)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(101, partition=1)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(90, partition=0)
- self.assert_message_count([ message for message in consumer ], 10)
- consumer.seek(20, partition=1)
- self.assert_message_count([ message for message in consumer ], 80)
- consumer.seek(0, partition=1)
- self.assert_message_count([ message for message in consumer ], 100)
-
- consumer.stop()
-
- @pytest.mark.skipif(env_kafka_version() >= (2, 0),
- reason="SimpleConsumer blocking does not handle PartialMessage change in kafka 2.0+")
- def test_simple_consumer_blocking(self):
- consumer = self.consumer()
-
- # Ask for 5 messages, nothing in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 1)
-
- self.send_messages(0, range(0, 5))
- self.send_messages(1, range(5, 10))
-
- # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=3)
- self.assert_message_count(messages, 5)
- self.assertLess(t.interval, 3)
-
- # Ask for 10 messages, get 5 back, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get 5 back, no blocking
- self.send_messages(0, range(0, 3))
- self.send_messages(1, range(3, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- def test_simple_consumer_pending(self):
- # make sure that we start with no pending messages
- consumer = self.consumer()
- self.assertEquals(consumer.pending(), 0)
- self.assertEquals(consumer.pending(partitions=[0]), 0)
- self.assertEquals(consumer.pending(partitions=[1]), 0)
-
- # Produce 10 messages to partitions 0 and 1
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- consumer = self.consumer()
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- # move to last message, so one partition should have 1 pending
- # message and other 0
- consumer.seek(-1, 2)
- self.assertEqual(consumer.pending(), 1)
-
- pending_part1 = consumer.pending(partitions=[0])
- pending_part2 = consumer.pending(partitions=[1])
- self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer(self):
- # Produce 100 messages to partitions 0 and 1
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer_blocking(self):
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- # Ask for 5 messages, No messages in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
-
- self.assertGreaterEqual(t.interval, 1)
-
- # Send 10 messages
- self.send_messages(0, range(0, 10))
-
- # Ask for 5 messages, 10 messages in queue, block 0 seconds
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=5)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get at least one back, no blocking
- self.send_messages(0, range(0, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- received_message_count = len(messages)
- self.assertGreaterEqual(received_message_count, 1)
- self.assert_message_count(messages, received_message_count)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_proc_pending(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # set group to None and auto_commit to False to avoid interactions w/
- # offset commit/fetch apis
- consumer = MultiProcessConsumer(self.client, None, self.topic,
- auto_commit=False, iter_timeout=0)
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:5, 1:15})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(consumer = MultiProcessConsumer,
- group='test_multi_process_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 5, 1: 15})
-
- def test_large_messages(self):
- # Produce 10 "normal" size messages
- small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
-
- # Produce 10 messages that are large (bigger than default fetch size)
- large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
-
- # Brokers prior to 0.11 will return the next message
- # if it is smaller than max_bytes (called buffer_size in SimpleConsumer)
- # Brokers 0.11 and later that store messages in v2 format
- # internally will return the next message only if the
- # full MessageSet is smaller than max_bytes.
- # For that reason, we set the max buffer size to a little more
- # than the size of all large messages combined
- consumer = self.consumer(max_buffer_size=60000)
-
- expected_messages = set(small_messages + large_messages)
- actual_messages = set([x.message.value for x in consumer
- if not isinstance(x.message, PartialMessage)])
- self.assertEqual(expected_messages, actual_messages)
-
- consumer.stop()
-
- def test_huge_messages(self):
- huge_message, = self.send_messages(0, [
- create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
- ])
-
- # Create a consumer with the default buffer size
- consumer = self.consumer()
-
- # This consumer fails to get the message
- with self.assertRaises(ConsumerFetchSizeTooSmall):
- consumer.get_message(False, 0.1)
-
- consumer.stop()
-
- # Create a consumer with no fetch size limit
- big_consumer = self.consumer(
- max_buffer_size = None,
- partitions = [0],
- )
-
- # Seek to the last message
- big_consumer.seek(-1, 2)
-
- # Consume giant message successfully
- message = big_consumer.get_message(block=False, timeout=10)
- self.assertIsNotNone(message)
- self.assertEqual(message.message.value, huge_message)
-
- big_consumer.stop()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ]
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = []
- idx = 0
- for message in consumer1:
- output_msgs1.append(message.message.value)
- idx += 1
- if idx >= 195:
- break
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- # TODO: Make this a unit test -- should not require integration
- def test_fetch_buffer_size(self):
-
- # Test parameters (see issue 135 / PR 136)
- TEST_MESSAGE_SIZE=1048
- INIT_BUFFER_SIZE=1024
- MAX_BUFFER_SIZE=2048
- assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
- assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
- assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
-
- self.send_messages(0, [ "x" * 1048 ])
- self.send_messages(1, [ "x" * 1048 ])
-
- consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
- messages = [ message for message in consumer ]
- self.assertEqual(len(messages), 2)
-
-
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages):
TIMEOUT_MS = 500
diff --git a/test/test_context.py b/test/test_context.py
deleted file mode 100644
index 3d41ba6..0000000
--- a/test/test_context.py
+++ /dev/null
@@ -1,117 +0,0 @@
-"""
-OffsetCommitContext tests.
-"""
-from . import unittest
-
-from mock import MagicMock, patch
-
-from kafka.context import OffsetCommitContext
-from kafka.errors import OffsetOutOfRangeError
-
-
-class TestOffsetCommitContext(unittest.TestCase):
- """
- OffsetCommitContext tests.
- """
-
- def setUp(self):
- self.client = MagicMock()
- self.consumer = MagicMock()
- self.topic = "topic"
- self.group = "group"
- self.partition = 0
- self.consumer.topic = self.topic
- self.consumer.group = self.group
- self.consumer.client = self.client
- self.consumer.offsets = {self.partition: 0}
- self.context = OffsetCommitContext(self.consumer)
-
- def test_noop(self):
- """
- Should revert consumer after context exit with no mark() call.
- """
- with self.context:
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- # offset restored
- self.assertEqual(self.consumer.offsets, {self.partition: 0})
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_mark(self):
- """
- Should remain at marked location ater context exit.
- """
- with self.context as context:
- context.mark(self.partition, 0)
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- # offset sent to client
- self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
-
- # offset remains advanced
- self.assertEqual(self.consumer.offsets, {self.partition: 1})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_mark_multiple(self):
- """
- Should remain at highest marked location after context exit.
- """
- with self.context as context:
- context.mark(self.partition, 0)
- context.mark(self.partition, 1)
- context.mark(self.partition, 2)
- # advance offset
- self.consumer.offsets = {self.partition: 3}
-
- # offset sent to client
- self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
-
- # offset remains advanced
- self.assertEqual(self.consumer.offsets, {self.partition: 3})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_rollback(self):
- """
- Should rollback to initial offsets on context exit with exception.
- """
- with self.assertRaises(Exception):
- with self.context as context:
- context.mark(self.partition, 0)
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- raise Exception("Intentional failure")
-
- # offset rolled back (ignoring mark)
- self.assertEqual(self.consumer.offsets, {self.partition: 0})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_out_of_range(self):
- """
- Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
- """
- def _seek(offset, whence):
- # seek must be called with 0, 0 to find the beginning of the range
- self.assertEqual(offset, 0)
- self.assertEqual(whence, 0)
- # set offsets to something different
- self.consumer.offsets = {self.partition: 100}
-
- with patch.object(self.consumer, "seek", _seek):
- with self.context:
- raise OffsetOutOfRangeError()
-
- self.assertEqual(self.consumer.offsets, {self.partition: 100})
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
deleted file mode 100644
index ad7dcb9..0000000
--- a/test/test_failover_integration.py
+++ /dev/null
@@ -1,240 +0,0 @@
-import logging
-import os
-import time
-
-from kafka import SimpleClient, SimpleConsumer, KeyedProducer
-from kafka.errors import (
- FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError)
-from kafka.producer.base import Producer
-from kafka.structs import TopicPartition
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, random_string
-
-
-log = logging.getLogger(__name__)
-
-
-class TestFailover(KafkaIntegrationTestCase):
- create_client = False
-
- def setUp(self):
- if not os.environ.get('KAFKA_VERSION'):
- self.skipTest('integration test requires KAFKA_VERSION')
-
- zk_chroot = random_string(10)
- replicas = 3
- partitions = 3
-
- # mini zookeeper, 3 kafka brokers
- self.zk = ZookeeperFixture.instance()
- kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
- 'partitions': partitions}
- self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs)
- for i in range(replicas)]
-
- hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
- self.client = SimpleClient(hosts, timeout=2)
- super(TestFailover, self).setUp()
-
- def tearDown(self):
- super(TestFailover, self).tearDown()
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- self.client.close()
- for broker in self.brokers:
- broker.close()
- self.zk.close()
-
- def test_switch_leader(self):
- topic = self.topic
- partition = 0
-
- # Testing the base Producer class here so that we can easily send
- # messages to a specific partition, kill the leader for that partition
- # and check that after another broker takes leadership the producer
- # is able to resume sending messages
-
- # require that the server commit messages to all in-sync replicas
- # so that failover doesn't lose any messages on server-side
- # and we can assert that server-side message count equals client-side
- producer = Producer(self.client, async_send=False,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
-
- # Send 100 random messages to a specific partition
- self._send_random_messages(producer, topic, partition, 100)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- # expect failure, but don't wait more than 60 secs to recover
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- log.debug("attempting to send 'success' message after leader killed")
- producer.send_messages(topic, partition, b'success')
- log.debug("success!")
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages to new leader
- self._send_random_messages(producer, topic, partition, 100)
-
- # count number of messages
- # Should be equal to 100 before + 1 recovery + 100 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 201, partitions=(partition,),
- at_least=True)
-
- def test_switch_leader_async(self):
- topic = self.topic
- partition = 0
-
- # Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async_send=True,
- batch_send_every_n=15,
- batch_send_every_t=3,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
- async_log_messages_on_error=False)
-
- # Send 10 random messages
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- log.debug("attempting to send 'success' message after leader killed")
-
- # in async mode, this should return immediately
- producer.send_messages(topic, partition, b'success')
- producer.send_messages(topic, partition + 1, b'success')
-
- # send to new leader
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # Stop the producer and wait for it to shutdown
- producer.stop()
- started = time.time()
- timeout = 60
- while (time.time() - started) < timeout:
- if not producer.thread.is_alive():
- break
- time.sleep(0.1)
- else:
- self.fail('timeout waiting for producer queue to empty')
-
- # count number of messages
- # Should be equal to 10 before + 1 recovery + 10 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 21, partitions=(partition,),
- at_least=True)
- self.assert_message_count(topic, 21, partitions=(partition + 1,),
- at_least=True)
-
- def test_switch_leader_keyed_producer(self):
- topic = self.topic
-
- producer = KeyedProducer(self.client, async_send=False)
-
- # Send 10 random messages
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- # kill leader for partition 0
- self._kill_leader(topic, 0)
-
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
- if producer.partitioners[topic].partition(key) == 0:
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages just to make sure no more exceptions
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- def test_switch_leader_simple_consumer(self):
- producer = Producer(self.client, async_send=False)
- consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
- self._send_random_messages(producer, self.topic, 0, 2)
- consumer.get_messages()
- self._kill_leader(self.topic, 0)
- consumer.get_messages()
-
- def _send_random_messages(self, producer, topic, partition, n):
- for j in range(n):
- msg = 'msg {0}: {1}'.format(j, random_string(10))
- log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
- while True:
- try:
- producer.send_messages(topic, partition, msg.encode('utf-8'))
- except Exception:
- log.exception('failure in _send_random_messages - retrying')
- continue
- else:
- break
-
- def _kill_leader(self, topic, partition):
- leader = self.client.topics_to_brokers[TopicPartition(topic, partition)]
- broker = self.brokers[leader.nodeId]
- broker.close()
- return broker
-
- def assert_message_count(self, topic, check_count, timeout=10,
- partitions=None, at_least=False):
- hosts = ','.join(['%s:%d' % (broker.host, broker.port)
- for broker in self.brokers])
-
- client = SimpleClient(hosts, timeout=2)
- consumer = SimpleConsumer(client, None, topic,
- partitions=partitions,
- auto_commit=False,
- iter_timeout=timeout)
-
- started_at = time.time()
- pending = -1
- while pending < check_count and (time.time() - started_at < timeout):
- try:
- pending = consumer.pending(partitions)
- except FailedPayloadsError:
- pass
- time.sleep(0.5)
-
- consumer.stop()
- client.close()
-
- if pending < check_count:
- self.fail('Too few pending messages: found %d, expected %d' %
- (pending, check_count))
- elif pending > check_count and not at_least:
- self.fail('Too many pending messages: found %d, expected %d' %
- (pending, check_count))
- return True
diff --git a/test/test_package.py b/test/test_package.py
index e520f3f..aa42c9c 100644
--- a/test/test_package.py
+++ b/test/test_package.py
@@ -6,20 +6,20 @@ class TestPackage:
assert kafka1.codec.__name__ == "kafka.codec"
def test_submodule_namespace(self):
- import kafka.client as client1
- assert client1.__name__ == "kafka.client"
+ import kafka.client_async as client1
+ assert client1.__name__ == "kafka.client_async"
- from kafka import client as client2
- assert client2.__name__ == "kafka.client"
+ from kafka import client_async as client2
+ assert client2.__name__ == "kafka.client_async"
- from kafka.client import SimpleClient as SimpleClient1
- assert SimpleClient1.__name__ == "SimpleClient"
+ from kafka.client_async import KafkaClient as KafkaClient1
+ assert KafkaClient1.__name__ == "KafkaClient"
+
+ from kafka import KafkaClient as KafkaClient2
+ assert KafkaClient2.__name__ == "KafkaClient"
from kafka.codec import gzip_encode as gzip_encode1
assert gzip_encode1.__name__ == "gzip_encode"
- from kafka import SimpleClient as SimpleClient2
- assert SimpleClient2.__name__ == "SimpleClient"
-
from kafka.codec import snappy_encode
assert snappy_encode.__name__ == "snappy_encode"
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index 3a5264b..853fbf6 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -2,8 +2,7 @@ from __future__ import absolute_import
import pytest
-from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner
-from kafka.partitioner.hashed import murmur2
+from kafka.partitioner import DefaultPartitioner, murmur2
def test_default_partitioner():
@@ -22,45 +21,15 @@ def test_default_partitioner():
assert partitioner(None, all_partitions, []) in all_partitions
-def test_roundrobin_partitioner():
- partitioner = RoundRobinPartitioner()
- all_partitions = available = list(range(100))
- # partitioner should cycle between partitions
- i = 0
- max_partition = all_partitions[len(all_partitions) - 1]
- while i <= max_partition:
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- i = 0
- while i <= int(max_partition / 2):
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- # test dynamic partition re-assignment
- available = available[:-25]
-
- while i <= max(available):
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- all_partitions = list(range(200))
- available = all_partitions
-
- max_partition = all_partitions[len(all_partitions) - 1]
- while i <= max_partition:
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
-
@pytest.mark.parametrize("bytes_payload,partition_number", [
(b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566),
(b'\x00 ', 742)
])
def test_murmur2_java_compatibility(bytes_payload, partition_number):
- p = Murmur2Partitioner(range(1000))
+ partitioner = DefaultPartitioner()
+ all_partitions = available = list(range(1000))
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
- assert p.partition(bytes_payload) == partition_number
+ assert partitioner(bytes_payload, all_partitions, available) == partition_number
def test_murmur2_not_ascii():
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
deleted file mode 100644
index 8f32cf8..0000000
--- a/test/test_producer_integration.py
+++ /dev/null
@@ -1,529 +0,0 @@
-import os
-import time
-import uuid
-
-import pytest
-from kafka.vendor.six.moves import range
-
-from kafka import (
- SimpleProducer, KeyedProducer,
- create_message, create_gzip_message, create_snappy_message,
- RoundRobinPartitioner, HashedPartitioner
-)
-from kafka.codec import has_snappy
-from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError
-from kafka.producer.base import Producer
-from kafka.protocol.message import PartialMessage
-from kafka.structs import FetchRequestPayload, ProduceRequestPayload
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
-
-
-# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
-# while the migration to pytest is in progress
-def assert_produce_request(client, topic, messages, initial_offset, message_ct,
- partition=0):
- """Verify the correctness of a produce request
- """
- produce = ProduceRequestPayload(topic, partition, messages=messages)
-
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
- resp = client.send_produce_request([produce])
- assert_produce_response(resp, initial_offset)
-
- assert current_offset(client, topic, partition) == initial_offset + message_ct
-
-
-def assert_produce_response(resp, initial_offset):
- """Verify that a produce response is well-formed
- """
- assert len(resp) == 1
- assert resp[0].error == 0
- assert resp[0].offset == initial_offset
-
-
-@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-def test_produce_many_simple(simple_client, topic):
- """Test multiple produces using the SimpleClient
- """
- start_offset = current_offset(simple_client, topic, 0)
-
- assert_produce_request(
- simple_client, topic,
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset,
- 100,
- )
-
- assert_produce_request(
- simple_client, topic,
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset+100,
- 100,
- )
-
-
-class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
-
- @classmethod
- def setUpClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server.close()
- cls.zk.close()
-
- def test_produce_10k_simple(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(10000)],
- start_offset,
- 10000,
- )
-
- def test_produce_many_gzip(self):
- start_offset = self.current_offset(self.topic, 0)
-
- message1 = create_gzip_message([
- (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)])
- message2 = create_gzip_message([
- (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)])
-
- self.assert_produce_request(
- [ message1, message2 ],
- start_offset,
- 200,
- )
-
- def test_produce_many_snappy(self):
- self.skipTest("All snappy integration tests fail with nosnappyjava")
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request([
- create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]),
- create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]),
- ],
- start_offset,
- 200,
- )
-
- def test_produce_mixed(self):
- start_offset = self.current_offset(self.topic, 0)
-
- msg_count = 1+100
- messages = [
- create_message(b"Just a plain message"),
- create_gzip_message([
- (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]),
- ]
-
- # All snappy integration tests fail with nosnappyjava
- if False and has_snappy():
- msg_count += 100
- messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)]))
-
- self.assert_produce_request(messages, start_offset, msg_count)
-
- def test_produce_100k_gzipped(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request([
- create_gzip_message([
- (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
- for i in range(50000)])
- ],
- start_offset,
- 50000,
- )
-
- self.assert_produce_request([
- create_gzip_message([
- (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
- for i in range(50000)])
- ],
- start_offset+50000,
- 50000,
- )
-
- ############################
- # SimpleProducer Tests #
- ############################
-
- def test_simple_producer_new_topic(self):
- producer = SimpleProducer(self.client)
- resp = producer.send_messages('new_topic', self.msg('foobar'))
- self.assert_produce_response(resp, 0)
- producer.stop()
-
- def test_simple_producer(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = SimpleProducer(self.client, random_start=False)
-
- # Goes to first partition, randomly.
- resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- self.assert_produce_response(resp, start_offsets[0])
-
- # Goes to the next partition, randomly.
- resp = producer.send_messages(self.topic, self.msg("three"))
- self.assert_produce_response(resp, start_offsets[1])
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])
-
- # Goes back to the first partition because there's only two partitions
- resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
- self.assert_produce_response(resp, start_offsets[0]+2)
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
-
- producer.stop()
-
- def test_producer_random_order(self):
- producer = SimpleProducer(self.client, random_start=True)
- resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- resp2 = producer.send_messages(self.topic, self.msg("three"))
- resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
-
- self.assertEqual(resp1[0].partition, resp3[0].partition)
- self.assertNotEqual(resp1[0].partition, resp2[0].partition)
-
- def test_producer_ordered_start(self):
- producer = SimpleProducer(self.client, random_start=False)
- resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- resp2 = producer.send_messages(self.topic, self.msg("three"))
- resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
-
- self.assertEqual(resp1[0].partition, 0)
- self.assertEqual(resp2[0].partition, 1)
- self.assertEqual(resp3[0].partition, 0)
-
- def test_async_simple_producer(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = SimpleProducer(self.client, async_send=True, random_start=False)
- resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEqual(len(resp), 0)
-
- # flush messages
- producer.stop()
-
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
-
- def test_batched_simple_producer__triggers_by_message(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- # Configure batch producer
- batch_messages = 5
- batch_interval = 5
- producer = SimpleProducer(
- self.client,
- async_send=True,
- batch_send_every_n=batch_messages,
- batch_send_every_t=batch_interval,
- random_start=False)
-
- # Send 4 messages -- should not trigger a batch
- resp = producer.send_messages(
- self.topic,
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # It hasn't sent yet
- self.assert_fetch_offset(partitions[0], start_offsets[0], [])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [])
-
- # send 3 more messages -- should trigger batch on first 5
- resp = producer.send_messages(
- self.topic,
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # Wait until producer has pulled all messages from internal queue
- # this should signal that the first batch was sent, and the producer
- # is now waiting for enough messages to batch again (or a timeout)
- timeout = 5
- start = time.time()
- while not producer.queue.empty():
- if time.time() - start > timeout:
- self.fail('timeout waiting for producer queue to empty')
- time.sleep(0.1)
-
- # send messages groups all *msgs in a single call to the same partition
- # so we should see all messages from the first call in one partition
- self.assert_fetch_offset(partitions[0], start_offsets[0], [
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- ])
-
- # Because we are batching every 5 messages, we should only see one
- self.assert_fetch_offset(partitions[1], start_offsets[1], [
- self.msg("five"),
- ])
-
- producer.stop()
-
- def test_batched_simple_producer__triggers_by_time(self):
- self.skipTest("Flakey test -- should be refactored or removed")
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- batch_interval = 5
- producer = SimpleProducer(
- self.client,
- async_send=True,
- batch_send_every_n=100,
- batch_send_every_t=batch_interval,
- random_start=False)
-
- # Send 5 messages and do a fetch
- resp = producer.send_messages(
- self.topic,
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # It hasn't sent yet
- self.assert_fetch_offset(partitions[0], start_offsets[0], [])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [])
-
- resp = producer.send_messages(self.topic,
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # Wait the timeout out
- time.sleep(batch_interval)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- ])
-
- self.assert_fetch_offset(partitions[1], start_offsets[1], [
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- ])
-
- producer.stop()
-
-
- ############################
- # KeyedProducer Tests #
- ############################
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_keyedproducer_null_payload(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- key = "test"
-
- resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- self.assert_produce_response(resp, start_offsets[0])
- resp = producer.send_messages(self.topic, self.key("key2"), None)
- self.assert_produce_response(resp, start_offsets[1])
- resp = producer.send_messages(self.topic, self.key("key3"), None)
- self.assert_produce_response(resp, start_offsets[0]+1)
- resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
- self.assert_produce_response(resp, start_offsets[1]+1)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
-
- producer.stop()
-
- def test_round_robin_partitioner(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two"))
- resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three"))
- resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
-
- self.assert_produce_response(resp1, start_offsets[0]+0)
- self.assert_produce_response(resp2, start_offsets[1]+0)
- self.assert_produce_response(resp3, start_offsets[0]+1)
- self.assert_produce_response(resp4, start_offsets[1]+1)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ])
-
- producer.stop()
-
- def test_hashed_partitioner(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
- resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one"))
- resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two"))
- resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three"))
- resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four"))
- resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five"))
-
- offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
- messages = {partitions[0]: [], partitions[1]: []}
-
- keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
- resps = [resp1, resp2, resp3, resp4, resp5]
- msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
-
- for key, resp, msg in zip(keys, resps, msgs):
- k = hash(key) % 2
- partition = partitions[k]
- offset = offsets[partition]
- self.assert_produce_response(resp, offset)
- offsets[partition] += 1
- messages[partition].append(msg)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]])
- self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])
-
- producer.stop()
-
- def test_async_keyed_producer(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = KeyedProducer(self.client,
- partitioner=RoundRobinPartitioner,
- async_send=True,
- batch_send_every_t=1)
-
- resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- self.assertEqual(len(resp), 0)
-
- # wait for the server to report a new highwatermark
- while self.current_offset(self.topic, partition) == start_offset:
- time.sleep(0.1)
-
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- ############################
- # Producer ACK Tests #
- ############################
-
- def test_acks_none(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_NOT_REQUIRED,
- )
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
-
- # No response from produce request with no acks required
- self.assertEqual(len(resp), 0)
-
- # But the message should still have been delivered
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- producer.stop()
-
- def test_acks_local_write(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- )
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
-
- self.assert_produce_response(resp, start_offset)
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- def test_acks_cluster_commit(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
- )
-
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
- self.assert_produce_response(resp, start_offset)
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- def assert_produce_request(self, messages, initial_offset, message_ct,
- partition=0):
- produce = ProduceRequestPayload(self.topic, partition, messages=messages)
-
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
- resp = self.client.send_produce_request([ produce ])
- self.assert_produce_response(resp, initial_offset)
-
- self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)
-
- def assert_produce_response(self, resp, initial_offset):
- self.assertEqual(len(resp), 1)
- self.assertEqual(resp[0].error, 0)
- self.assertEqual(resp[0].offset, initial_offset)
-
- def assert_fetch_offset(self, partition, start_offset, expected_messages):
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
-
- resp, = self.client.send_fetch_request([FetchRequestPayload(self.topic, partition, start_offset, 1024)])
-
- self.assertEqual(resp.error, 0)
- self.assertEqual(resp.partition, partition)
- messages = [ x.message.value for x in resp.messages
- if not isinstance(x.message, PartialMessage) ]
-
- self.assertEqual(messages, expected_messages)
- self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
deleted file mode 100644
index ab80ee7..0000000
--- a/test/test_producer_legacy.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import collections
-import logging
-import threading
-import time
-
-from mock import MagicMock, patch
-from . import unittest
-
-from kafka import SimpleClient, SimpleProducer, KeyedProducer
-from kafka.errors import (
- AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError)
-from kafka.producer.base import Producer, _send_upstream
-from kafka.protocol import CODEC_NONE
-from kafka.structs import (
- ProduceResponsePayload, RetryOptions, TopicPartition)
-
-from kafka.vendor.six.moves import queue, range
-
-
-class TestKafkaProducer(unittest.TestCase):
- def test_producer_message_types(self):
-
- producer = Producer(MagicMock())
- topic = b"test-topic"
- partition = 0
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'}, None,)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, partition, m)
-
- good_data_types = (b'a string!',)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, partition, m)
-
- def test_keyedproducer_message_types(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
- producer = KeyedProducer(client)
- topic = b"test-topic"
- key = b"testkey"
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'},)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, key, m)
-
- good_data_types = (b'a string!', None,)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, key, m)
-
- def test_topic_message_types(self):
- client = MagicMock()
-
- def partitions(topic):
- return [0, 1]
-
- client.get_partition_ids_for_topic = partitions
-
- producer = SimpleProducer(client, random_start=False)
- topic = b"test-topic"
- producer.send_messages(topic, b'hi')
- assert client.send_produce_request.called
-
- @patch('kafka.producer.base._send_upstream')
- def test_producer_async_queue_overfilled(self, mock):
- queue_size = 2
- producer = Producer(MagicMock(), async_send=True,
- async_queue_maxsize=queue_size)
-
- topic = b'test-topic'
- partition = 0
- message = b'test-message'
-
- with self.assertRaises(AsyncProducerQueueFull):
- message_list = [message] * (queue_size + 1)
- producer.send_messages(topic, partition, *message_list)
- self.assertEqual(producer.queue.qsize(), queue_size)
- for _ in range(producer.queue.qsize()):
- producer.queue.get()
-
- def test_producer_sync_fail_on_error(self):
- error = FailedPayloadsError('failure')
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, 'ensure_topic_exists'):
- with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
-
- client = SimpleClient(MagicMock())
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
-
- # This should not raise
- (response,) = producer.send_messages('foobar', b'test message')
- self.assertEqual(response, error)
-
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages('foobar', b'test message')
-
- def test_cleanup_is_not_called_on_stopped_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = True
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 0)
-
- def test_cleanup_is_called_on_running_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = False
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 1)
-
-
-class TestKafkaProducerSendUpstream(unittest.TestCase):
-
- def setUp(self):
- self.client = MagicMock()
- self.queue = queue.Queue()
-
- def _run_process(self, retries_limit=3, sleep_timeout=1):
- # run _send_upstream process with the queue
- stop_event = threading.Event()
- retry_options = RetryOptions(limit=retries_limit,
- backoff_ms=50,
- retry_on_timeouts=False)
- self.thread = threading.Thread(
- target=_send_upstream,
- args=(self.queue, self.client, CODEC_NONE,
- 0.3, # batch time (seconds)
- 3, # batch length
- Producer.ACK_AFTER_LOCAL_WRITE,
- Producer.DEFAULT_ACK_TIMEOUT,
- retry_options,
- stop_event))
- self.thread.daemon = True
- self.thread.start()
- time.sleep(sleep_timeout)
- stop_event.set()
-
- def test_wo_retries(self):
-
- # lets create a queue and add 10 messages for 1 partition
- for i in range(10):
- self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
-
- self._run_process()
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 4 non-void cals:
- # 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 4)
-
- def test_first_send_failed(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [FailedPayloadsError(req) for req in reqs]
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # plus 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def test_with_limited_retries(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
-
- def send_side_effect(reqs, *args, **kwargs):
- return [FailedPayloadsError(req) for req in reqs]
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(3, 3)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 16 non-void calls:
- # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
- self.assertEqual(self.client.send_produce_request.call_count, 16)
-
- def test_async_producer_not_leader(self):
-
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [ProduceResponsePayload(req.topic, req.partition,
- NotLeaderForPartitionError.errno, -1)
- for req in reqs]
-
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def tearDown(self):
- for _ in range(self.queue.qsize()):
- self.queue.get()
diff --git a/test/test_protocol_legacy.py b/test/test_protocol_legacy.py
deleted file mode 100644
index 1341af0..0000000
--- a/test/test_protocol_legacy.py
+++ /dev/null
@@ -1,848 +0,0 @@
-#pylint: skip-file
-from contextlib import contextmanager
-import struct
-
-from kafka.vendor import six
-from mock import patch, sentinel
-from . import unittest
-
-from kafka.codec import has_snappy, gzip_decode, snappy_decode
-from kafka.errors import (
- ChecksumError, KafkaUnavailableError, UnsupportedCodecError,
- ConsumerFetchSizeTooSmall, ProtocolError)
-from kafka.protocol import (
- ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
- create_message, create_gzip_message, create_snappy_message,
- create_message_set)
-from kafka.structs import (
- OffsetRequestPayload, OffsetResponsePayload,
- OffsetCommitRequestPayload, OffsetCommitResponsePayload,
- OffsetFetchRequestPayload, OffsetFetchResponsePayload,
- ProduceRequestPayload, ProduceResponsePayload,
- FetchRequestPayload, FetchResponsePayload,
- Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse)
-
-
-class TestProtocol(unittest.TestCase):
- def test_create_message(self):
- payload = "test"
- key = "key"
- msg = create_message(payload, key)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, 0)
- self.assertEqual(msg.key, key)
- self.assertEqual(msg.value, payload)
-
- def test_create_gzip(self):
- payloads = [(b"v1", None), (b"v2", None)]
- msg = create_gzip_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
- self.assertEqual(msg.key, None)
- # Need to decode to check since gzipped payload is non-deterministic
- decoded = gzip_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", 1285512130), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v1", # Message contents
-
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", -711587208), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v2", # Message contents
- ])
-
- self.assertEqual(decoded, expect)
-
- def test_create_gzip_keyed(self):
- payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
- msg = create_gzip_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
- self.assertEqual(msg.key, None)
- # Need to decode to check since gzipped payload is non-deterministic
- decoded = gzip_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(decoded, expect)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_create_snappy(self):
- payloads = [(b"v1", None), (b"v2", None)]
- msg = create_snappy_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
- self.assertEqual(msg.key, None)
- decoded = snappy_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", 1285512130), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v1", # Message contents
-
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", -711587208), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v2", # Message contents
- ])
-
- self.assertEqual(decoded, expect)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_create_snappy_keyed(self):
- payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
- msg = create_snappy_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
- self.assertEqual(msg.key, None)
- decoded = snappy_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(decoded, expect)
-
- def test_encode_message_header(self):
- expect = b"".join([
- struct.pack(">h", 10), # API Key
- struct.pack(">h", 0), # API Version
- struct.pack(">i", 4), # Correlation Id
- struct.pack(">h", len("client3")), # Length of clientId
- b"client3", # ClientId
- ])
-
- encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10)
- self.assertEqual(encoded, expect)
-
- def test_encode_message(self):
- message = create_message(b"test", b"key")
- encoded = KafkaProtocol._encode_message(message)
- expect = b"".join([
- struct.pack(">i", -1427009701), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 3), # Length of key
- b"key", # key
- struct.pack(">i", 4), # Length of value
- b"test", # value
- ])
-
- self.assertEqual(encoded, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message(self):
- encoded = b"".join([
- struct.pack(">i", -1427009701), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 3), # Length of key
- b"key", # key
- struct.pack(">i", 4), # Length of value
- b"test", # value
- ])
-
- offset = 10
- (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
-
- self.assertEqual(returned_offset, offset)
- self.assertEqual(decoded_message, create_message(b"test", b"key"))
-
- def test_encode_message_failure(self):
- with self.assertRaises(ProtocolError):
- KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_message_set(self):
- message_set = [
- create_message(b"v1", b"k1"),
- create_message(b"v2", b"k2")
- ]
-
- encoded = KafkaProtocol._encode_message_set(message_set)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(encoded, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set(self):
- encoded = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 1), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
- self.assertEqual(len(msgs), 2)
- msg1, msg2 = msgs
-
- returned_offset1, decoded_message1 = msg1
- returned_offset2, decoded_message2 = msg2
-
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
-
- self.assertEqual(returned_offset2, 1)
- self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_gzip(self):
- gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
- b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
- b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
- b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
- b'\x00')
- offset = 11
- messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
-
- self.assertEqual(len(messages), 2)
- msg1, msg2 = messages
-
- returned_offset1, decoded_message1 = msg1
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1"))
-
- returned_offset2, decoded_message2 = msg2
- self.assertEqual(returned_offset2, 0)
- self.assertEqual(decoded_message2, create_message(b"v2"))
-
- @unittest.skip('needs updating for new protocol classes')
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_decode_message_snappy(self):
- snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
- b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
- b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
- b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
- offset = 11
- messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
- self.assertEqual(len(messages), 2)
-
- msg1, msg2 = messages
-
- returned_offset1, decoded_message1 = msg1
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1"))
-
- returned_offset2, decoded_message2 = msg2
- self.assertEqual(returned_offset2, 0)
- self.assertEqual(decoded_message2, create_message(b"v2"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_checksum_error(self):
- invalid_encoded_message = b"This is not a valid encoded message"
- iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
- self.assertRaises(ChecksumError, list, iter)
-
- # NOTE: The error handling in _decode_message_set_iter() is questionable.
- # If it's modified, the next two tests might need to be fixed.
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set_fetch_size_too_small(self):
- with self.assertRaises(ConsumerFetchSizeTooSmall):
- list(KafkaProtocol._decode_message_set_iter('a'))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set_stop_iteration(self):
- encoded = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 1), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- b"@1$%(Y!", # Random padding
- ])
-
- msgs = MessageSet.decode(io.BytesIO(encoded))
- self.assertEqual(len(msgs), 2)
- msg1, msg2 = msgs
-
- returned_offset1, msg_size1, decoded_message1 = msg1
- returned_offset2, msg_size2, decoded_message2 = msg2
-
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1.value, b"v1")
- self.assertEqual(decoded_message1.key, b"k1")
-
- self.assertEqual(returned_offset2, 1)
- self.assertEqual(decoded_message2.value, b"v2")
- self.assertEqual(decoded_message2.key, b"k2")
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_produce_request(self):
- requests = [
- ProduceRequestPayload("topic1", 0, [
- kafka.protocol.message.Message(b"a"),
- kafka.protocol.message.Message(b"b")
- ]),
- ProduceRequestPayload("topic2", 1, [
- kafka.protocol.message.Message(b"c")
- ])
- ]
-
- msg_a_binary = KafkaProtocol._encode_message(create_message(b"a"))
- msg_b_binary = KafkaProtocol._encode_message(create_message(b"b"))
- msg_c_binary = KafkaProtocol._encode_message(create_message(b"c"))
-
- header = b"".join([
- struct.pack('>i', 0x94), # The length of the message overall
- struct.pack('>h', 0), # Msg Header, Message type = Produce
- struct.pack('>h', 0), # Msg Header, API version
- struct.pack('>i', 2), # Msg Header, Correlation ID
- struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID
- struct.pack('>h', 2), # Num acks required
- struct.pack('>i', 100), # Request Timeout
- struct.pack('>i', 2), # The number of requests
- ])
-
- total_len = len(msg_a_binary) + len(msg_b_binary)
- topic1 = b"".join([
- struct.pack('>h6s', 6, b'topic1'), # The topic1
- struct.pack('>i', 1), # One message set
- struct.pack('>i', 0), # Partition 0
- struct.pack('>i', total_len + 24), # Size of the incoming message set
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_a_binary)), # Length of message
- msg_a_binary, # Actual message
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_b_binary)), # Length of message
- msg_b_binary, # Actual message
- ])
-
- topic2 = b"".join([
- struct.pack('>h6s', 6, b'topic2'), # The topic1
- struct.pack('>i', 1), # One message set
- struct.pack('>i', 1), # Partition 1
- struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_c_binary)), # Length of message
- msg_c_binary, # Actual message
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100)
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_produce_response(self):
- t1 = b"topic1"
- t2 = b"topic2"
- _long = int
- if six.PY2:
- _long = long
- encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
- 2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20),
- len(t2), t2, 1, 0, 0, _long(30))
- responses = list(KafkaProtocol.decode_produce_response(encoded))
- self.assertEqual(responses,
- [ProduceResponse(t1, 0, 0, _long(10)),
- ProduceResponse(t1, 1, 1, _long(20)),
- ProduceResponse(t2, 0, 0, _long(30))])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_fetch_request(self):
- requests = [
- FetchRequest(b"topic1", 0, 10, 1024),
- FetchRequest(b"topic2", 1, 20, 100),
- ]
-
- header = b"".join([
- struct.pack('>i', 89), # The length of the message overall
- struct.pack('>h', 1), # Msg Header, Message type = Fetch
- struct.pack('>h', 0), # Msg Header, API version
- struct.pack('>i', 3), # Msg Header, Correlation ID
- struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 2), # Max wait time
- struct.pack('>i', 100), # Min bytes
- struct.pack('>i', 2), # Num requests
- ])
-
- topic1 = b"".join([
- struct.pack('>h6s', 6, b'topic1'),# Topic
- struct.pack('>i', 1), # Num Payloads
- struct.pack('>i', 0), # Partition 0
- struct.pack('>q', 10), # Offset
- struct.pack('>i', 1024), # Max Bytes
- ])
-
- topic2 = b"".join([
- struct.pack('>h6s', 6, b'topic2'),# Topic
- struct.pack('>i', 1), # Num Payloads
- struct.pack('>i', 1), # Partition 0
- struct.pack('>q', 20), # Offset
- struct.pack('>i', 100), # Max Bytes
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100)
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_fetch_response(self):
- t1 = b"topic1"
- t2 = b"topic2"
- msgs = [create_message(msg)
- for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]]
- ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
- ms2 = KafkaProtocol._encode_message_set([msgs[2]])
- ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
-
- encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' %
- (len(t1), len(ms1), len(ms2), len(t2), len(ms3)),
- 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1,
- 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30,
- len(ms3), ms3)
-
- responses = list(KafkaProtocol.decode_fetch_response(encoded))
- def expand_messages(response):
- return FetchResponsePayload(response.topic, response.partition,
- response.error, response.highwaterMark,
- list(response.messages))
-
- expanded_responses = list(map(expand_messages, responses))
- expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
- OffsetAndMessage(0, msgs[1])]),
- FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
- FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
- OffsetAndMessage(0, msgs[4])])]
- self.assertEqual(expanded_responses, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_metadata_request_no_topics(self):
- expected = b"".join([
- struct.pack(">i", 17), # Total length of the request
- struct.pack('>h', 3), # API key metadata fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>i', 0), # No topics, give all the data!
- ])
-
- encoded = KafkaProtocol.encode_metadata_request(b"cid", 4)
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_metadata_request_with_topics(self):
- expected = b"".join([
- struct.pack(">i", 25), # Total length of the request
- struct.pack('>h', 3), # API key metadata fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>i', 2), # Number of topics in the request
- struct.pack('>h2s', 2, b"t1"), # Topic "t1"
- struct.pack('>h2s', 2, b"t2"), # Topic "t2"
- ])
-
- encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"])
-
- self.assertEqual(encoded, expected)
-
- def _create_encoded_metadata_response(self, brokers, topics):
- encoded = []
- encoded.append(struct.pack('>ii', 3, len(brokers)))
- for broker in brokers:
- encoded.append(struct.pack('>ih%dsi' % len(broker.host),
- broker.nodeId, len(broker.host),
- broker.host, broker.port))
-
- encoded.append(struct.pack('>i', len(topics)))
- for topic in topics:
- encoded.append(struct.pack('>hh%dsi' % len(topic.topic),
- topic.error, len(topic.topic),
- topic.topic, len(topic.partitions)))
- for metadata in topic.partitions:
- encoded.append(struct.pack('>hiii', metadata.error,
- metadata.partition, metadata.leader,
- len(metadata.replicas)))
- if len(metadata.replicas) > 0:
- encoded.append(struct.pack('>%di' % len(metadata.replicas),
- *metadata.replicas))
-
- encoded.append(struct.pack('>i', len(metadata.isr)))
- if len(metadata.isr) > 0:
- encoded.append(struct.pack('>%di' % len(metadata.isr),
- *metadata.isr))
- return b''.join(encoded)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_metadata_response(self):
- node_brokers = [
- BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
- BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
- BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
- ]
-
- '''
- topic_partitions = [
- TopicMetadata(b"topic1", 0, [
- PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
- PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
- ]),
- TopicMetadata(b"topic2", 1, [
- PartitionMetadata(b"topic2", 0, 0, (), (), 0),
- ]),
- ]
- encoded = self._create_encoded_metadata_response(node_brokers,
- topic_partitions)
- decoded = KafkaProtocol.decode_metadata_response(encoded)
- self.assertEqual(decoded, (node_brokers, topic_partitions))
- '''
-
- def test_encode_consumer_metadata_request(self):
- expected = b"".join([
- struct.pack(">i", 17), # Total length of the request
- struct.pack('>h', 10), # API key consumer metadata
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>h2s', 2, b"g1"), # Group "g1"
- ])
-
- encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
-
- self.assertEqual(encoded, expected)
-
- def test_decode_consumer_metadata_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">h", 0), # No Error
- struct.pack(">i", 1), # Broker ID
- struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
- struct.pack(">i", 1000), # Broker Port
- ])
-
- results = KafkaProtocol.decode_consumer_metadata_response(encoded)
- self.assertEqual(results,
- ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
- )
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_request(self):
- expected = b"".join([
- struct.pack(">i", 21), # Total length of the request
- struct.pack('>h', 2), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"), # The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 0), # No topic/partitions
- ])
-
- encoded = KafkaProtocol.encode_offset_request(b"cid", 4)
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_request__no_payload(self):
- expected = b"".join([
- struct.pack(">i", 65), # Total length of the request
-
- struct.pack('>h', 2), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"), # The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 1), # Num topics
- struct.pack(">h6s", 6, b"topic1"),# Topic for the request
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 3), # Partition 3
- struct.pack(">q", -1), # No time offset
- struct.pack(">i", 1), # One offset requested
-
- struct.pack(">i", 4), # Partition 3
- struct.pack(">q", -1), # No time offset
- struct.pack(">i", 1), # One offset requested
- ])
-
- encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [
- OffsetRequest(b'topic1', 3, -1, 1),
- OffsetRequest(b'topic1', 4, -1, 1),
- ])
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topics
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">h", 0), # No error
- struct.pack(">i", 1), # One offset
- struct.pack(">q", 4), # Offset 4
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">h", 0), # No error
- struct.pack(">i", 1), # One offset
- struct.pack(">q", 8), # Offset 8
- ])
-
- results = KafkaProtocol.decode_offset_response(encoded)
- self.assertEqual(set(results), set([
- OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)),
- OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)),
- ]))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_commit_request(self):
- header = b"".join([
- struct.pack('>i', 99), # Total message length
-
- struct.pack('>h', 8), # Message type = offset commit
- struct.pack('>h', 0), # API version
- struct.pack('>i', 42), # Correlation ID
- struct.pack('>h9s', 9, b"client_id"),# The client ID
- struct.pack('>h8s', 8, b"group_id"), # The group to commit for
- struct.pack('>i', 2), # Num topics
- ])
-
- topic1 = b"".join([
- struct.pack(">h6s", 6, b"topic1"), # Topic for the request
- struct.pack(">i", 2), # Two partitions
- struct.pack(">i", 0), # Partition 0
- struct.pack(">q", 123), # Offset 123
- struct.pack(">h", -1), # Null metadata
- struct.pack(">i", 1), # Partition 1
- struct.pack(">q", 234), # Offset 234
- struct.pack(">h", -1), # Null metadata
- ])
-
- topic2 = b"".join([
- struct.pack(">h6s", 6, b"topic2"), # Topic for the request
- struct.pack(">i", 1), # One partition
- struct.pack(">i", 2), # Partition 2
- struct.pack(">q", 345), # Offset 345
- struct.pack(">h", -1), # Null metadata
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [
- OffsetCommitRequest(b"topic1", 0, 123, None),
- OffsetCommitRequest(b"topic1", 1, 234, None),
- OffsetCommitRequest(b"topic2", 2, 345, None),
- ])
-
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_commit_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topic
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">h", 0), # No error
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">h", 0), # No error
- ])
-
- results = KafkaProtocol.decode_offset_commit_response(encoded)
- self.assertEqual(set(results), set([
- OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0),
- OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0),
- ]))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_fetch_request(self):
- header = b"".join([
- struct.pack('>i', 69), # Total message length
- struct.pack('>h', 9), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 42), # Correlation ID
- struct.pack('>h9s', 9, b"client_id"),# The client ID
- struct.pack('>h8s', 8, b"group_id"), # The group to commit for
- struct.pack('>i', 2), # Num topics
- ])
-
- topic1 = b"".join([
- struct.pack(">h6s", 6, b"topic1"), # Topic for the request
- struct.pack(">i", 2), # Two partitions
- struct.pack(">i", 0), # Partition 0
- struct.pack(">i", 1), # Partition 1
- ])
-
- topic2 = b"".join([
- struct.pack(">h6s", 6, b"topic2"), # Topic for the request
- struct.pack(">i", 1), # One partitions
- struct.pack(">i", 2), # Partition 2
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [
- OffsetFetchRequest(b"topic1", 0),
- OffsetFetchRequest(b"topic1", 1),
- OffsetFetchRequest(b"topic2", 2),
- ])
-
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_fetch_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topics
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">q", 4), # Offset 4
- struct.pack(">h4s", 4, b"meta"), # Metadata
- struct.pack(">h", 0), # No error
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">q", 8), # Offset 8
- struct.pack(">h4s", 4, b"meta"), # Metadata
- struct.pack(">h", 0), # No error
- ])
-
- results = KafkaProtocol.decode_offset_fetch_response(encoded)
- self.assertEqual(set(results), set([
- OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"),
- OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"),
- ]))
-
- @contextmanager
- def mock_create_message_fns(self):
- import kafka.protocol
- with patch.object(kafka.protocol.legacy, "create_message",
- return_value=sentinel.message):
- with patch.object(kafka.protocol.legacy, "create_gzip_message",
- return_value=sentinel.gzip_message):
- with patch.object(kafka.protocol.legacy, "create_snappy_message",
- return_value=sentinel.snappy_message):
- yield
-
- def test_create_message_set(self):
- messages = [(1, "k1"), (2, "k2"), (3, "k3")]
-
- # Default codec is CODEC_NONE. Expect list of regular messages.
- expect = [sentinel.message] * len(messages)
- with self.mock_create_message_fns():
- message_set = create_message_set(messages)
- self.assertEqual(message_set, expect)
-
- # CODEC_NONE: Expect list of regular messages.
- expect = [sentinel.message] * len(messages)
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_NONE)
- self.assertEqual(message_set, expect)
-
- # CODEC_GZIP: Expect list of one gzip-encoded message.
- expect = [sentinel.gzip_message]
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_GZIP)
- self.assertEqual(message_set, expect)
-
- # CODEC_SNAPPY: Expect list of one snappy-encoded message.
- expect = [sentinel.snappy_message]
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_SNAPPY)
- self.assertEqual(message_set, expect)
-
- # Unknown codec should raise UnsupportedCodecError.
- with self.assertRaises(UnsupportedCodecError):
- create_message_set(messages, -1)
diff --git a/test/test_util.py b/test/test_util.py
deleted file mode 100644
index a4dbaa5..0000000
--- a/test/test_util.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-import struct
-
-from kafka.vendor import six
-from . import unittest
-
-import kafka.errors
-import kafka.structs
-import kafka.util
-
-
-class UtilTest(unittest.TestCase):
- @unittest.skip("Unwritten")
- def test_relative_unpack(self):
- pass
-
- def test_write_int_string(self):
- self.assertEqual(
- kafka.util.write_int_string(b'some string'),
- b'\x00\x00\x00\x0bsome string'
- )
-
- def test_write_int_string__unicode(self):
- with self.assertRaises(TypeError) as cm:
- kafka.util.write_int_string(u'unicode')
- #: :type: TypeError
- te = cm.exception
- if six.PY2:
- self.assertIn('unicode', str(te))
- else:
- self.assertIn('str', str(te))
- self.assertIn('to be bytes', str(te))
-
- def test_write_int_string__empty(self):
- self.assertEqual(
- kafka.util.write_int_string(b''),
- b'\x00\x00\x00\x00'
- )
-
- def test_write_int_string__null(self):
- self.assertEqual(
- kafka.util.write_int_string(None),
- b'\xff\xff\xff\xff'
- )
-
- def test_read_short_string(self):
- self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
- self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
- self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
-
- def test_relative_unpack2(self):
- self.assertEqual(
- kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
- ((1, 0), 4)
- )
-
- def test_relative_unpack3(self):
- with self.assertRaises(kafka.errors.BufferUnderflowError):
- kafka.util.relative_unpack('>hh', '\x00', 0)
-
- def test_group_by_topic_and_partition(self):
- t = kafka.structs.TopicPartition
-
- l = [
- t("a", 1),
- t("a", 2),
- t("a", 3),
- t("b", 3),
- ]
-
- self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
- "a": {
- 1: t("a", 1),
- 2: t("a", 2),
- 3: t("a", 3),
- },
- "b": {
- 3: t("b", 3),
- }
- })
-
- # should not be able to group duplicate topic-partitions
- t1 = t("a", 1)
- with self.assertRaises(AssertionError):
- kafka.util.group_by_topic_and_partition([t1, t1])
diff --git a/test/testutil.py b/test/testutil.py
index 650f9bf..77a6673 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -4,18 +4,6 @@ import os
import random
import string
import time
-import uuid
-
-import pytest
-from . import unittest
-
-from kafka import SimpleClient
-from kafka.errors import (
- LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
- NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- FailedPayloadsError
-)
-from kafka.structs import OffsetRequestPayload
def random_string(length):
@@ -32,21 +20,6 @@ def env_kafka_version():
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-def current_offset(client, topic, partition, kafka_broker=None):
- """Get the current offset of a topic's partition
- """
- try:
- offsets, = client.send_offset_request([OffsetRequestPayload(topic,
- partition, -1, 1)])
- except Exception:
- # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
- if kafka_broker:
- kafka_broker.dump_logs()
- raise
- else:
- return offsets.offsets[0]
-
-
def assert_message_count(messages, num_messages):
"""Check that we received the expected number of messages with no duplicates."""
# Make sure we got them all
@@ -58,84 +31,6 @@ def assert_message_count(messages, num_messages):
assert len(unique_messages) == num_messages
-class KafkaIntegrationTestCase(unittest.TestCase):
- create_client = True
- topic = None
- zk = None
- server = None
-
- def setUp(self):
- super(KafkaIntegrationTestCase, self).setUp()
- if not os.environ.get('KAFKA_VERSION'):
- self.skipTest('Integration test requires KAFKA_VERSION')
-
- if not self.topic:
- topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
- self.topic = topic
-
- if self.create_client:
- self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port))
-
- timeout = time.time() + 30
- while time.time() < timeout:
- try:
- self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
- if self.client.has_metadata_for_topic(topic):
- break
- except (LeaderNotAvailableError, InvalidTopicError):
- time.sleep(1)
- else:
- raise KafkaTimeoutError('Timeout loading topic metadata!')
-
-
- # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
- # TODO: It might be a good idea to move this to self.client.ensure_topic_exists
- for partition in self.client.get_partition_ids_for_topic(self.topic):
- while True:
- try:
- req = OffsetRequestPayload(self.topic, partition, -1, 100)
- self.client.send_offset_request([req])
- break
- except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
- if time.time() > timeout:
- raise KafkaTimeoutError('Timeout loading topic metadata!')
- time.sleep(.1)
-
- self._messages = {}
-
- def tearDown(self):
- super(KafkaIntegrationTestCase, self).tearDown()
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- if self.create_client:
- self.client.close()
-
- def current_offset(self, topic, partition):
- try:
- offsets, = self.client.send_offset_request([OffsetRequestPayload(topic,
- partition, -1, 1)])
- except Exception:
- # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
- self.zk.child.dump_logs()
- self.server.child.dump_logs()
- raise
- else:
- return offsets.offsets[0]
-
- def msgs(self, iterable):
- return [self.msg(x) for x in iterable]
-
- def msg(self, s):
- if s not in self._messages:
- self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
-
- return self._messages[s].encode('utf-8')
-
- def key(self, k):
- return k.encode('utf-8')
-
-
class Timer(object):
def __enter__(self):
self.start = time.time()