diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/__init__.py | 7 | ||||
-rw-r--r-- | test/conftest.py | 9 | ||||
-rw-r--r-- | test/fixtures.py | 7 | ||||
-rw-r--r-- | test/test_client.py | 405 | ||||
-rw-r--r-- | test/test_client_integration.py | 95 | ||||
-rw-r--r-- | test/test_consumer.py | 135 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 498 | ||||
-rw-r--r-- | test/test_context.py | 117 | ||||
-rw-r--r-- | test/test_failover_integration.py | 240 | ||||
-rw-r--r-- | test/test_package.py | 18 | ||||
-rw-r--r-- | test/test_partitioner.py | 39 | ||||
-rw-r--r-- | test/test_producer_integration.py | 529 | ||||
-rw-r--r-- | test/test_producer_legacy.py | 257 | ||||
-rw-r--r-- | test/test_protocol_legacy.py | 848 | ||||
-rw-r--r-- | test/test_util.py | 85 | ||||
-rw-r--r-- | test/testutil.py | 105 |
16 files changed, 19 insertions, 3375 deletions
diff --git a/test/__init__.py b/test/__init__.py index 3d2ba3d..71f667d 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -1,12 +1,5 @@ from __future__ import absolute_import -import sys - -if sys.version_info < (2, 7): - import unittest2 as unittest # pylint: disable=import-error -else: - import unittest - # Set default logging handler to avoid "No handler found" warnings. import logging try: # Python 2.7+ diff --git a/test/conftest.py b/test/conftest.py index bbe4048..3fa0262 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -43,15 +43,6 @@ def kafka_broker_factory(zookeeper): @pytest.fixture -def simple_client(kafka_broker, request, topic): - """Return a SimpleClient fixture""" - client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,)) - client.ensure_topic_exists(topic) - yield client - client.close() - - -@pytest.fixture def kafka_client(kafka_broker, request): """Return a KafkaClient fixture""" (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) diff --git a/test/fixtures.py b/test/fixtures.py index 68572b5..557fca6 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -13,8 +13,7 @@ import py from kafka.vendor.six.moves import urllib, range from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401 -from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient -from kafka.client_async import KafkaClient +from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer from kafka.protocol.admin import CreateTopicsRequest from kafka.protocol.metadata import MetadataRequest from test.testutil import env_kafka_version, random_string @@ -524,7 +523,3 @@ class KafkaFixture(Fixture): for x in range(cnt): params['client_id'] = '%s_%s' % (client_id, random_string(4)) yield KafkaProducer(**params) - - def get_simple_client(self, **params): - params.setdefault('client_id', 'simple_client') - return SimpleClient(self.bootstrap_server(), **params) diff --git a/test/test_client.py b/test/test_client.py deleted file mode 100644 index 1c68978..0000000 --- a/test/test_client.py +++ /dev/null @@ -1,405 +0,0 @@ -import socket - -from mock import ANY, MagicMock, patch -from operator import itemgetter -from kafka.vendor import six -from . import unittest - -from kafka import SimpleClient -from kafka.errors import ( - KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError, - UnknownTopicOrPartitionError, FailedPayloadsError) -from kafka.future import Future -from kafka.protocol import KafkaProtocol, create_message -from kafka.protocol.metadata import MetadataResponse -from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition - - -NO_ERROR = 0 -UNKNOWN_TOPIC_OR_PARTITION = 3 -NO_LEADER = 5 - - -def mock_conn(conn, success=True): - mocked = MagicMock() - mocked.connected.return_value = True - if success: - mocked.send.return_value = Future().success(True) - else: - mocked.send.return_value = Future().failure(Exception()) - conn.return_value = mocked - conn.recv.return_value = [] - - -class TestSimpleClient(unittest.TestCase): - def test_init_with_list(self): - with patch.object(SimpleClient, 'load_metadata_for_topics'): - client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) - - self.assertEqual( - sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), - ('kafka03', 9092, socket.AF_UNSPEC)]), - sorted(client.hosts)) - - def test_init_with_csv(self): - with patch.object(SimpleClient, 'load_metadata_for_topics'): - client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092') - - self.assertEqual( - sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), - ('kafka03', 9092, socket.AF_UNSPEC)]), - sorted(client.hosts)) - - def test_init_with_unicode_csv(self): - with patch.object(SimpleClient, 'load_metadata_for_topics'): - client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092') - - self.assertEqual( - sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC), - ('kafka03', 9092, socket.AF_UNSPEC)]), - sorted(client.hosts)) - - @patch.object(SimpleClient, '_get_conn') - @patch.object(SimpleClient, 'load_metadata_for_topics') - def test_send_broker_unaware_request_fail(self, load_metadata, conn): - mocked_conns = { - ('kafka01', 9092): MagicMock(), - ('kafka02', 9092): MagicMock() - } - for val in mocked_conns.values(): - mock_conn(val, success=False) - - def mock_get_conn(host, port, afi): - return mocked_conns[(host, port)] - conn.side_effect = mock_get_conn - - client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092']) - - req = KafkaProtocol.encode_metadata_request() - with self.assertRaises(KafkaUnavailableError): - client._send_broker_unaware_request(payloads=['fake request'], - encoder_fn=MagicMock(return_value='fake encoded message'), - decoder_fn=lambda x: x) - - for key, conn in six.iteritems(mocked_conns): - conn.send.assert_called_with('fake encoded message') - - def test_send_broker_unaware_request(self): - mocked_conns = { - ('kafka01', 9092): MagicMock(), - ('kafka02', 9092): MagicMock(), - ('kafka03', 9092): MagicMock() - } - # inject BrokerConnection side effects - mock_conn(mocked_conns[('kafka01', 9092)], success=False) - mock_conn(mocked_conns[('kafka03', 9092)], success=False) - future = Future() - mocked_conns[('kafka02', 9092)].send.return_value = future - mocked_conns[('kafka02', 9092)].recv.return_value = [('valid response', future)] - - def mock_get_conn(host, port, afi): - return mocked_conns[(host, port)] - - # patch to avoid making requests before we want it - with patch.object(SimpleClient, 'load_metadata_for_topics'): - with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn): - - client = SimpleClient(hosts='kafka01:9092,kafka02:9092') - resp = client._send_broker_unaware_request(payloads=['fake request'], - encoder_fn=MagicMock(), - decoder_fn=lambda x: x) - - self.assertEqual('valid response', resp) - mocked_conns[('kafka02', 9092)].recv.assert_called_once_with() - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_load_metadata(self, protocol, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_ERROR, 'topic_1', [ - (NO_ERROR, 0, 1, [1, 2], [1, 2]) - ]), - (NO_ERROR, 'topic_noleader', [ - (NO_LEADER, 0, -1, [], []), - (NO_LEADER, 1, -1, [], []), - ]), - (NO_LEADER, 'topic_no_partitions', []), - (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []), - (NO_ERROR, 'topic_3', [ - (NO_ERROR, 0, 0, [0, 1], [0, 1]), - (NO_ERROR, 1, 1, [1, 0], [1, 0]), - (NO_ERROR, 2, 0, [0, 1], [0, 1]) - ]) - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - # client loads metadata at init - client = SimpleClient(hosts=['broker_1:4567']) - self.assertDictEqual({ - TopicPartition('topic_1', 0): brokers[1], - TopicPartition('topic_noleader', 0): None, - TopicPartition('topic_noleader', 1): None, - TopicPartition('topic_3', 0): brokers[0], - TopicPartition('topic_3', 1): brokers[1], - TopicPartition('topic_3', 2): brokers[0]}, - client.topics_to_brokers) - - # if we ask for metadata explicitly, it should raise errors - with self.assertRaises(LeaderNotAvailableError): - client.load_metadata_for_topics('topic_no_partitions') - - with self.assertRaises(UnknownTopicOrPartitionError): - client.load_metadata_for_topics('topic_unknown') - - # This should not raise - client.load_metadata_for_topics('topic_no_leader') - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_has_metadata_for_topic(self, protocol, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_LEADER, 'topic_still_creating', []), - (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), - (NO_ERROR, 'topic_noleaders', [ - (NO_LEADER, 0, -1, [], []), - (NO_LEADER, 1, -1, [], []), - ]), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - # Topics with no partitions return False - self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) - self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) - - # Topic with partition metadata, but no leaders return True - self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol.decode_metadata_response') - def test_ensure_topic_exists(self, decode_metadata_response, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_LEADER, 'topic_still_creating', []), - (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), - (NO_ERROR, 'topic_noleaders', [ - (NO_LEADER, 0, -1, [], []), - (NO_LEADER, 1, -1, [], []), - ]), - ] - decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - with self.assertRaises(UnknownTopicOrPartitionError): - client.ensure_topic_exists('topic_doesnt_exist', timeout=1) - - with self.assertRaises(KafkaTimeoutError): - client.ensure_topic_exists('topic_still_creating', timeout=1) - - # This should not raise - client.ensure_topic_exists('topic_noleaders', timeout=1) - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): - "Get leader for partitions reload metadata if it is not available" - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_LEADER, 'topic_no_partitions', []) - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - # topic metadata is loaded but empty - self.assertDictEqual({}, client.topics_to_brokers) - - topics = [ - (NO_ERROR, 'topic_one_partition', [ - (NO_ERROR, 0, 0, [0, 1], [0, 1]) - ]) - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - # calling _get_leader_for_partition (from any broker aware request) - # will try loading metadata again for the same topic - leader = client._get_leader_for_partition('topic_one_partition', 0) - - self.assertEqual(brokers[0], leader) - self.assertDictEqual({ - TopicPartition('topic_one_partition', 0): brokers[0]}, - client.topics_to_brokers) - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_get_leader_for_unassigned_partitions(self, protocol, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_LEADER, 'topic_no_partitions', []), - (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - self.assertDictEqual({}, client.topics_to_brokers) - - with self.assertRaises(LeaderNotAvailableError): - client._get_leader_for_partition('topic_no_partitions', 0) - - with self.assertRaises(UnknownTopicOrPartitionError): - client._get_leader_for_partition('topic_unknown', 0) - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_get_leader_exceptions_when_noleader(self, protocol, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_ERROR, 'topic_noleader', [ - (NO_LEADER, 0, -1, [], []), - (NO_LEADER, 1, -1, [], []), - ]), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - self.assertDictEqual( - { - TopicPartition('topic_noleader', 0): None, - TopicPartition('topic_noleader', 1): None - }, - client.topics_to_brokers) - - # No leader partitions -- raise LeaderNotAvailableError - with self.assertRaises(LeaderNotAvailableError): - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0)) - with self.assertRaises(LeaderNotAvailableError): - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) - - # Unknown partitions -- raise UnknownTopicOrPartitionError - with self.assertRaises(UnknownTopicOrPartitionError): - self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2)) - - topics = [ - (NO_ERROR, 'topic_noleader', [ - (NO_ERROR, 0, 0, [0, 1], [0, 1]), - (NO_ERROR, 1, 1, [1, 0], [1, 0]) - ]), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0)) - self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1)) - - @patch.object(SimpleClient, '_get_conn') - @patch('kafka.client.KafkaProtocol') - def test_send_produce_request_raises_when_noleader(self, protocol, conn): - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (NO_ERROR, 'topic_noleader', [ - (NO_LEADER, 0, -1, [], []), - (NO_LEADER, 1, -1, [], []), - ]), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - requests = [ProduceRequestPayload( - "topic_noleader", 0, - [create_message("a"), create_message("b")])] - - with self.assertRaises(FailedPayloadsError): - client.send_produce_request(requests) - - @patch('kafka.SimpleClient._get_conn') - @patch('kafka.client.KafkaProtocol') - def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn): - - mock_conn(conn) - - brokers = [ - BrokerMetadata(0, 'broker_1', 4567, None), - BrokerMetadata(1, 'broker_2', 5678, None) - ] - resp0_brokers = list(map(itemgetter(0, 1, 2), brokers)) - - topics = [ - (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []), - ] - protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics) - - client = SimpleClient(hosts=['broker_1:4567']) - - requests = [ProduceRequestPayload( - "topic_doesnt_exist", 0, - [create_message("a"), create_message("b")])] - - with self.assertRaises(FailedPayloadsError): - client.send_produce_request(requests) - - def test_correlation_rollover(self): - with patch.object(SimpleClient, 'load_metadata_for_topics'): - big_num = 2**31 - 3 - client = SimpleClient(hosts=(), correlation_id=big_num) - self.assertEqual(big_num + 1, client._next_id()) - self.assertEqual(big_num + 2, client._next_id()) - self.assertEqual(0, client._next_id()) diff --git a/test/test_client_integration.py b/test/test_client_integration.py deleted file mode 100644 index a983ce1..0000000 --- a/test/test_client_integration.py +++ /dev/null @@ -1,95 +0,0 @@ -import os - -import pytest - -from kafka.errors import KafkaTimeoutError -from kafka.protocol import create_message -from kafka.structs import ( - FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload, - ProduceRequestPayload) - -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version - - -class TestKafkaClientIntegration(KafkaIntegrationTestCase): - @classmethod - def setUpClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk) - - @classmethod - def tearDownClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.server.close() - cls.zk.close() - - def test_consume_none(self): - fetch = FetchRequestPayload(self.topic, 0, 0, 1024) - - fetch_resp, = self.client.send_fetch_request([fetch]) - self.assertEqual(fetch_resp.error, 0) - self.assertEqual(fetch_resp.topic, self.topic) - self.assertEqual(fetch_resp.partition, 0) - - messages = list(fetch_resp.messages) - self.assertEqual(len(messages), 0) - - def test_ensure_topic_exists(self): - - # assume that self.topic was created by setUp - # if so, this should succeed - self.client.ensure_topic_exists(self.topic, timeout=1) - - # ensure_topic_exists should fail with KafkaTimeoutError - with self.assertRaises(KafkaTimeoutError): - self.client.ensure_topic_exists('this_topic_doesnt_exist', timeout=0) - - def test_send_produce_request_maintains_request_response_order(self): - - self.client.ensure_topic_exists('foo') - self.client.ensure_topic_exists('bar') - - requests = [ - ProduceRequestPayload( - 'foo', 0, - [create_message(b'a'), create_message(b'b')]), - ProduceRequestPayload( - 'bar', 1, - [create_message(b'a'), create_message(b'b')]), - ProduceRequestPayload( - 'foo', 1, - [create_message(b'a'), create_message(b'b')]), - ProduceRequestPayload( - 'bar', 0, - [create_message(b'a'), create_message(b'b')]), - ] - - responses = self.client.send_produce_request(requests) - while len(responses): - request = requests.pop() - response = responses.pop() - self.assertEqual(request.topic, response.topic) - self.assertEqual(request.partition, response.partition) - - - #################### - # Offset Tests # - #################### - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_commit_fetch_offsets(self): - req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata') - (resp,) = self.client.send_offset_commit_request('group', [req]) - self.assertEqual(resp.error, 0) - - req = OffsetFetchRequestPayload(self.topic, 0) - (resp,) = self.client.send_offset_fetch_request('group', [req]) - self.assertEqual(resp.error, 0) - self.assertEqual(resp.offset, 42) - self.assertEqual(resp.metadata, '') # Metadata isn't stored for now diff --git a/test/test_consumer.py b/test/test_consumer.py index edcc2d8..436fe55 100644 --- a/test/test_consumer.py +++ b/test/test_consumer.py @@ -1,15 +1,7 @@ -import sys - -from mock import MagicMock, patch -from . import unittest import pytest -from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer -from kafka.errors import ( - FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError, - UnknownTopicOrPartitionError) -from kafka.structs import ( - FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload) +from kafka import KafkaConsumer +from kafka.errors import KafkaConfigurationError class TestKafkaConsumer: @@ -32,126 +24,3 @@ class TestKafkaConsumer: assert sub == set(['foo']) sub.add('fizz') assert consumer.subscription() == set(['foo']) - - -class TestMultiProcessConsumer(unittest.TestCase): - @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows') - def test_partition_list(self): - client = MagicMock() - partitions = (0,) - with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets: - MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions) - self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) ) - self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member - - -class TestSimpleConsumer(unittest.TestCase): - def test_non_integer_partitions(self): - with self.assertRaises(AssertionError): - SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0']) - - def test_simple_consumer_failed_payloads(self): - client = MagicMock() - consumer = SimpleConsumer(client, group=None, - topic='topic', partitions=[0, 1], - auto_commit=False) - - def failed_payloads(payload): - return FailedPayloadsError(payload) - - client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads) - - # This should not raise an exception - consumer.get_messages(5) - - def test_simple_consumer_leader_change(self): - client = MagicMock() - consumer = SimpleConsumer(client, group=None, - topic='topic', partitions=[0, 1], - auto_commit=False) - - # Mock so that only the first request gets a valid response - def not_leader(request): - return FetchResponsePayload(request.topic, request.partition, - NotLeaderForPartitionError.errno, -1, ()) - - client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader) - - # This should not raise an exception - consumer.get_messages(20) - - # client should have updated metadata - self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1) - self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1) - - def test_simple_consumer_unknown_topic_partition(self): - client = MagicMock() - consumer = SimpleConsumer(client, group=None, - topic='topic', partitions=[0, 1], - auto_commit=False) - - # Mock so that only the first request gets a valid response - def unknown_topic_partition(request): - return FetchResponsePayload(request.topic, request.partition, - UnknownTopicOrPartitionError.errno, -1, ()) - - client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition) - - # This should not raise an exception - with self.assertRaises(UnknownTopicOrPartitionError): - consumer.get_messages(20) - - def test_simple_consumer_commit_does_not_raise(self): - client = MagicMock() - client.get_partition_ids_for_topic.return_value = [0, 1] - - def mock_offset_fetch_request(group, payloads, **kwargs): - return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads] - - client.send_offset_fetch_request.side_effect = mock_offset_fetch_request - - def mock_offset_commit_request(group, payloads, **kwargs): - raise FailedPayloadsError(payloads[0]) - - client.send_offset_commit_request.side_effect = mock_offset_commit_request - - consumer = SimpleConsumer(client, group='foobar', - topic='topic', partitions=[0, 1], - auto_commit=False) - - # Mock internal commit check - consumer.count_since_commit = 10 - - # This should not raise an exception - self.assertFalse(consumer.commit(partitions=[0, 1])) - - def test_simple_consumer_reset_partition_offset(self): - client = MagicMock() - - def mock_offset_request(payloads, **kwargs): - raise FailedPayloadsError(payloads[0]) - - client.send_offset_request.side_effect = mock_offset_request - - consumer = SimpleConsumer(client, group='foobar', - topic='topic', partitions=[0, 1], - auto_commit=False) - - # This should not raise an exception - self.assertEqual(consumer.reset_partition_offset(0), None) - - @staticmethod - def fail_requests_factory(error_factory): - # Mock so that only the first request gets a valid response - def fail_requests(payloads, **kwargs): - responses = [ - FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0, - [OffsetAndMessage( - payloads[0].offset + i, - "msg %d" % (payloads[0].offset + i)) - for i in range(10)]), - ] - for failure in payloads[1:]: - responses.append(error_factory(failure)) - return responses - return fail_requests diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index d6fd41c..6e6bc94 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -1,29 +1,17 @@ import logging -import os import time from mock import patch import pytest from kafka.vendor.six.moves import range -from . import unittest -from kafka import ( - KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, - create_gzip_message, KafkaProducer -) import kafka.codec -from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.errors import ( - ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError, - KafkaTimeoutError, UnsupportedCodecError -) -from kafka.protocol.message import PartialMessage -from kafka.structs import ( - ProduceRequestPayload, TopicPartition, OffsetAndTimestamp + KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError ) +from kafka.structs import TopicPartition, OffsetAndTimestamp -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string +from test.testutil import Timer, assert_message_count, env_kafka_version, random_string @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @@ -63,486 +51,6 @@ def test_kafka_consumer_unsupported_encoding( consumer.poll(timeout_ms=2000) -class TestConsumerIntegration(KafkaIntegrationTestCase): - maxDiff = None - - @classmethod - def setUpClass(cls): - if not os.environ.get('KAFKA_VERSION'): - return - - cls.zk = ZookeeperFixture.instance() - chroot = random_string(10) - cls.server1 = KafkaFixture.instance(0, cls.zk, - zk_chroot=chroot) - cls.server2 = KafkaFixture.instance(1, cls.zk, - zk_chroot=chroot) - - cls.server = cls.server1 # Bootstrapping server - - @classmethod - def tearDownClass(cls): - if not os.environ.get('KAFKA_VERSION'): - return - - cls.server1.close() - cls.server2.close() - cls.zk.close() - - def send_messages(self, partition, messages): - messages = [ create_message(self.msg(str(msg))) for msg in messages ] - produce = ProduceRequestPayload(self.topic, partition, messages = messages) - resp, = self.client.send_produce_request([produce]) - self.assertEqual(resp.error, 0) - - return [ x.value for x in messages ] - - def send_gzip_message(self, partition, messages): - message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages]) - produce = ProduceRequestPayload(self.topic, partition, messages = [message]) - resp, = self.client.send_produce_request([produce]) - self.assertEqual(resp.error, 0) - - def assert_message_count(self, messages, num_messages): - # Make sure we got them all - self.assertEqual(len(messages), num_messages) - - # Make sure there are no duplicates - self.assertEqual(len(set(messages)), num_messages) - - def consumer(self, **kwargs): - if os.environ['KAFKA_VERSION'] == "0.8.0": - # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off - kwargs['group'] = None - kwargs['auto_commit'] = False - else: - kwargs.setdefault('group', None) - kwargs.setdefault('auto_commit', False) - - consumer_class = kwargs.pop('consumer', SimpleConsumer) - group = kwargs.pop('group', None) - topic = kwargs.pop('topic', self.topic) - - if consumer_class in [SimpleConsumer, MultiProcessConsumer]: - kwargs.setdefault('iter_timeout', 0) - - return consumer_class(self.client, group, topic, **kwargs) - - def kafka_consumer(self, **configs): - brokers = '%s:%d' % (self.server.host, self.server.port) - consumer = KafkaConsumer(self.topic, - bootstrap_servers=brokers, - **configs) - return consumer - - def kafka_producer(self, **configs): - brokers = '%s:%d' % (self.server.host, self.server.port) - producer = KafkaProducer( - bootstrap_servers=brokers, **configs) - return producer - - def test_simple_consumer(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer = self.consumer() - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - def test_simple_consumer_gzip(self): - self.send_gzip_message(0, range(0, 100)) - self.send_gzip_message(1, range(100, 200)) - - # Start a consumer - consumer = self.consumer() - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - def test_simple_consumer_smallest_offset_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer(auto_offset_reset='smallest') - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - # Since auto_offset_reset is set to smallest we should read all 200 - # messages from beginning. - self.assert_message_count([message for message in consumer], 200) - - def test_simple_consumer_largest_offset_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Default largest - consumer = self.consumer() - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - # Since auto_offset_reset is set to largest we should not read any - # messages. - self.assert_message_count([message for message in consumer], 0) - # Send 200 new messages to the queue - self.send_messages(0, range(200, 300)) - self.send_messages(1, range(300, 400)) - # Since the offset is set to largest we should read all the new messages. - self.assert_message_count([message for message in consumer], 200) - - def test_simple_consumer_no_reset(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Default largest - consumer = self.consumer(auto_offset_reset=None) - # Move fetch offset ahead of 300 message (out of range) - consumer.seek(300, 2) - with self.assertRaises(OffsetOutOfRangeError): - consumer.get_message() - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_simple_consumer_load_initial_offsets(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Create 1st consumer and change offsets - consumer = self.consumer(group='test_simple_consumer_load_initial_offsets') - self.assertEqual(consumer.offsets, {0: 0, 1: 0}) - consumer.offsets.update({0:51, 1:101}) - # Update counter after manual offsets update - consumer.count_since_commit += 1 - consumer.commit() - - # Create 2nd consumer and check initial offsets - consumer = self.consumer(group='test_simple_consumer_load_initial_offsets', - auto_commit=False) - self.assertEqual(consumer.offsets, {0: 51, 1: 101}) - - def test_simple_consumer__seek(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer() - - # Rewind 10 messages from the end - consumer.seek(-10, 2) - self.assert_message_count([ message for message in consumer ], 10) - - # Rewind 13 messages from the end - consumer.seek(-13, 2) - self.assert_message_count([ message for message in consumer ], 13) - - # Set absolute offset - consumer.seek(100) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(100, partition=0) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(101, partition=1) - self.assert_message_count([ message for message in consumer ], 0) - consumer.seek(90, partition=0) - self.assert_message_count([ message for message in consumer ], 10) - consumer.seek(20, partition=1) - self.assert_message_count([ message for message in consumer ], 80) - consumer.seek(0, partition=1) - self.assert_message_count([ message for message in consumer ], 100) - - consumer.stop() - - @pytest.mark.skipif(env_kafka_version() >= (2, 0), - reason="SimpleConsumer blocking does not handle PartialMessage change in kafka 2.0+") - def test_simple_consumer_blocking(self): - consumer = self.consumer() - - # Ask for 5 messages, nothing in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(block=True, timeout=1) - self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 1) - - self.send_messages(0, range(0, 5)) - self.send_messages(1, range(5, 10)) - - # Ask for 5 messages, 10 in queue. Get 5 back, no blocking - with Timer() as t: - messages = consumer.get_messages(count=5, block=True, timeout=3) - self.assert_message_count(messages, 5) - self.assertLess(t.interval, 3) - - # Ask for 10 messages, get 5 back, block 1 second - with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=1) - self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 - # second, get 5 back, no blocking - self.send_messages(0, range(0, 3)) - self.send_messages(1, range(3, 5)) - with Timer() as t: - messages = consumer.get_messages(count=10, block=1, timeout=1) - self.assert_message_count(messages, 5) - self.assertLessEqual(t.interval, 1) - - consumer.stop() - - def test_simple_consumer_pending(self): - # make sure that we start with no pending messages - consumer = self.consumer() - self.assertEquals(consumer.pending(), 0) - self.assertEquals(consumer.pending(partitions=[0]), 0) - self.assertEquals(consumer.pending(partitions=[1]), 0) - - # Produce 10 messages to partitions 0 and 1 - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - consumer = self.consumer() - - self.assertEqual(consumer.pending(), 20) - self.assertEqual(consumer.pending(partitions=[0]), 10) - self.assertEqual(consumer.pending(partitions=[1]), 10) - - # move to last message, so one partition should have 1 pending - # message and other 0 - consumer.seek(-1, 2) - self.assertEqual(consumer.pending(), 1) - - pending_part1 = consumer.pending(partitions=[0]) - pending_part2 = consumer.pending(partitions=[1]) - self.assertEquals(set([0, 1]), set([pending_part1, pending_part2])) - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_process_consumer(self): - # Produce 100 messages to partitions 0 and 1 - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - consumer = self.consumer(consumer = MultiProcessConsumer) - - self.assert_message_count([ message for message in consumer ], 200) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_process_consumer_blocking(self): - consumer = self.consumer(consumer = MultiProcessConsumer) - - # Ask for 5 messages, No messages in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(block=True, timeout=1) - self.assert_message_count(messages, 0) - - self.assertGreaterEqual(t.interval, 1) - - # Send 10 messages - self.send_messages(0, range(0, 10)) - - # Ask for 5 messages, 10 messages in queue, block 0 seconds - with Timer() as t: - messages = consumer.get_messages(count=5, block=True, timeout=5) - self.assert_message_count(messages, 5) - self.assertLessEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, block 1 second - with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=1) - self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 1) - - # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1 - # second, get at least one back, no blocking - self.send_messages(0, range(0, 5)) - with Timer() as t: - messages = consumer.get_messages(count=10, block=1, timeout=1) - received_message_count = len(messages) - self.assertGreaterEqual(received_message_count, 1) - self.assert_message_count(messages, received_message_count) - self.assertLessEqual(t.interval, 1) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - def test_multi_proc_pending(self): - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - # set group to None and auto_commit to False to avoid interactions w/ - # offset commit/fetch apis - consumer = MultiProcessConsumer(self.client, None, self.topic, - auto_commit=False, iter_timeout=0) - - self.assertEqual(consumer.pending(), 20) - self.assertEqual(consumer.pending(partitions=[0]), 10) - self.assertEqual(consumer.pending(partitions=[1]), 10) - - consumer.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_multi_process_consumer_load_initial_offsets(self): - self.send_messages(0, range(0, 10)) - self.send_messages(1, range(10, 20)) - - # Create 1st consumer and change offsets - consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets') - self.assertEqual(consumer.offsets, {0: 0, 1: 0}) - consumer.offsets.update({0:5, 1:15}) - # Update counter after manual offsets update - consumer.count_since_commit += 1 - consumer.commit() - - # Create 2nd consumer and check initial offsets - consumer = self.consumer(consumer = MultiProcessConsumer, - group='test_multi_process_consumer_load_initial_offsets', - auto_commit=False) - self.assertEqual(consumer.offsets, {0: 5, 1: 15}) - - def test_large_messages(self): - # Produce 10 "normal" size messages - small_messages = self.send_messages(0, [ str(x) for x in range(10) ]) - - # Produce 10 messages that are large (bigger than default fetch size) - large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ]) - - # Brokers prior to 0.11 will return the next message - # if it is smaller than max_bytes (called buffer_size in SimpleConsumer) - # Brokers 0.11 and later that store messages in v2 format - # internally will return the next message only if the - # full MessageSet is smaller than max_bytes. - # For that reason, we set the max buffer size to a little more - # than the size of all large messages combined - consumer = self.consumer(max_buffer_size=60000) - - expected_messages = set(small_messages + large_messages) - actual_messages = set([x.message.value for x in consumer - if not isinstance(x.message, PartialMessage)]) - self.assertEqual(expected_messages, actual_messages) - - consumer.stop() - - def test_huge_messages(self): - huge_message, = self.send_messages(0, [ - create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)), - ]) - - # Create a consumer with the default buffer size - consumer = self.consumer() - - # This consumer fails to get the message - with self.assertRaises(ConsumerFetchSizeTooSmall): - consumer.get_message(False, 0.1) - - consumer.stop() - - # Create a consumer with no fetch size limit - big_consumer = self.consumer( - max_buffer_size = None, - partitions = [0], - ) - - # Seek to the last message - big_consumer.seek(-1, 2) - - # Consume giant message successfully - message = big_consumer.get_message(block=False, timeout=10) - self.assertIsNotNone(message) - self.assertEqual(message.message.value, huge_message) - - big_consumer.stop() - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_offset_behavior__resuming_behavior(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.consumer( - group='test_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # Grab the first 195 messages - output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ] - self.assert_message_count(output_msgs1, 195) - - # The total offset across both partitions should be at 180 - consumer2 = self.consumer( - group='test_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # 181-200 - self.assert_message_count([ message for message in consumer2 ], 20) - - consumer1.stop() - consumer2.stop() - - @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky') - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_multi_process_offset_behavior__resuming_behavior(self): - self.send_messages(0, range(0, 100)) - self.send_messages(1, range(100, 200)) - - # Start a consumer - consumer1 = self.consumer( - consumer=MultiProcessConsumer, - group='test_multi_process_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # Grab the first 195 messages - output_msgs1 = [] - idx = 0 - for message in consumer1: - output_msgs1.append(message.message.value) - idx += 1 - if idx >= 195: - break - self.assert_message_count(output_msgs1, 195) - - # The total offset across both partitions should be at 180 - consumer2 = self.consumer( - consumer=MultiProcessConsumer, - group='test_multi_process_offset_behavior__resuming_behavior', - auto_commit=True, - auto_commit_every_t = None, - auto_commit_every_n = 20, - ) - - # 181-200 - self.assert_message_count([ message for message in consumer2 ], 20) - - consumer1.stop() - consumer2.stop() - - # TODO: Make this a unit test -- should not require integration - def test_fetch_buffer_size(self): - - # Test parameters (see issue 135 / PR 136) - TEST_MESSAGE_SIZE=1048 - INIT_BUFFER_SIZE=1024 - MAX_BUFFER_SIZE=2048 - assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE - assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE - assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE - - self.send_messages(0, [ "x" * 1048 ]) - self.send_messages(1, [ "x" * 1048 ]) - - consumer = self.consumer(buffer_size=1024, max_buffer_size=2048) - messages = [ message for message in consumer ] - self.assertEqual(len(messages), 2) - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages): TIMEOUT_MS = 500 diff --git a/test/test_context.py b/test/test_context.py deleted file mode 100644 index 3d41ba6..0000000 --- a/test/test_context.py +++ /dev/null @@ -1,117 +0,0 @@ -""" -OffsetCommitContext tests. -""" -from . import unittest - -from mock import MagicMock, patch - -from kafka.context import OffsetCommitContext -from kafka.errors import OffsetOutOfRangeError - - -class TestOffsetCommitContext(unittest.TestCase): - """ - OffsetCommitContext tests. - """ - - def setUp(self): - self.client = MagicMock() - self.consumer = MagicMock() - self.topic = "topic" - self.group = "group" - self.partition = 0 - self.consumer.topic = self.topic - self.consumer.group = self.group - self.consumer.client = self.client - self.consumer.offsets = {self.partition: 0} - self.context = OffsetCommitContext(self.consumer) - - def test_noop(self): - """ - Should revert consumer after context exit with no mark() call. - """ - with self.context: - # advance offset - self.consumer.offsets = {self.partition: 1} - - # offset restored - self.assertEqual(self.consumer.offsets, {self.partition: 0}) - # and seek called with relative zero delta - self.assertEqual(self.consumer.seek.call_count, 1) - self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) - - def test_mark(self): - """ - Should remain at marked location ater context exit. - """ - with self.context as context: - context.mark(self.partition, 0) - # advance offset - self.consumer.offsets = {self.partition: 1} - - # offset sent to client - self.assertEqual(self.client.send_offset_commit_request.call_count, 1) - - # offset remains advanced - self.assertEqual(self.consumer.offsets, {self.partition: 1}) - - # and seek called with relative zero delta - self.assertEqual(self.consumer.seek.call_count, 1) - self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) - - def test_mark_multiple(self): - """ - Should remain at highest marked location after context exit. - """ - with self.context as context: - context.mark(self.partition, 0) - context.mark(self.partition, 1) - context.mark(self.partition, 2) - # advance offset - self.consumer.offsets = {self.partition: 3} - - # offset sent to client - self.assertEqual(self.client.send_offset_commit_request.call_count, 1) - - # offset remains advanced - self.assertEqual(self.consumer.offsets, {self.partition: 3}) - - # and seek called with relative zero delta - self.assertEqual(self.consumer.seek.call_count, 1) - self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) - - def test_rollback(self): - """ - Should rollback to initial offsets on context exit with exception. - """ - with self.assertRaises(Exception): - with self.context as context: - context.mark(self.partition, 0) - # advance offset - self.consumer.offsets = {self.partition: 1} - - raise Exception("Intentional failure") - - # offset rolled back (ignoring mark) - self.assertEqual(self.consumer.offsets, {self.partition: 0}) - - # and seek called with relative zero delta - self.assertEqual(self.consumer.seek.call_count, 1) - self.assertEqual(self.consumer.seek.call_args[0], (0, 1)) - - def test_out_of_range(self): - """ - Should reset to beginning of valid offsets on `OffsetOutOfRangeError` - """ - def _seek(offset, whence): - # seek must be called with 0, 0 to find the beginning of the range - self.assertEqual(offset, 0) - self.assertEqual(whence, 0) - # set offsets to something different - self.consumer.offsets = {self.partition: 100} - - with patch.object(self.consumer, "seek", _seek): - with self.context: - raise OffsetOutOfRangeError() - - self.assertEqual(self.consumer.offsets, {self.partition: 100}) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py deleted file mode 100644 index ad7dcb9..0000000 --- a/test/test_failover_integration.py +++ /dev/null @@ -1,240 +0,0 @@ -import logging -import os -import time - -from kafka import SimpleClient, SimpleConsumer, KeyedProducer -from kafka.errors import ( - FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, - NotLeaderForPartitionError) -from kafka.producer.base import Producer -from kafka.structs import TopicPartition - -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, random_string - - -log = logging.getLogger(__name__) - - -class TestFailover(KafkaIntegrationTestCase): - create_client = False - - def setUp(self): - if not os.environ.get('KAFKA_VERSION'): - self.skipTest('integration test requires KAFKA_VERSION') - - zk_chroot = random_string(10) - replicas = 3 - partitions = 3 - - # mini zookeeper, 3 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas, - 'partitions': partitions} - self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs) - for i in range(replicas)] - - hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers] - self.client = SimpleClient(hosts, timeout=2) - super(TestFailover, self).setUp() - - def tearDown(self): - super(TestFailover, self).tearDown() - if not os.environ.get('KAFKA_VERSION'): - return - - self.client.close() - for broker in self.brokers: - broker.close() - self.zk.close() - - def test_switch_leader(self): - topic = self.topic - partition = 0 - - # Testing the base Producer class here so that we can easily send - # messages to a specific partition, kill the leader for that partition - # and check that after another broker takes leadership the producer - # is able to resume sending messages - - # require that the server commit messages to all in-sync replicas - # so that failover doesn't lose any messages on server-side - # and we can assert that server-side message count equals client-side - producer = Producer(self.client, async_send=False, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) - - # Send 100 random messages to a specific partition - self._send_random_messages(producer, topic, partition, 100) - - # kill leader for partition - self._kill_leader(topic, partition) - - # expect failure, but don't wait more than 60 secs to recover - recovered = False - started = time.time() - timeout = 60 - while not recovered and (time.time() - started) < timeout: - try: - log.debug("attempting to send 'success' message after leader killed") - producer.send_messages(topic, partition, b'success') - log.debug("success!") - recovered = True - except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, - NotLeaderForPartitionError): - log.debug("caught exception sending message -- will retry") - continue - - # Verify we successfully sent the message - self.assertTrue(recovered) - - # send some more messages to new leader - self._send_random_messages(producer, topic, partition, 100) - - # count number of messages - # Should be equal to 100 before + 1 recovery + 100 after - # at_least=True because exactly once delivery isn't really a thing - self.assert_message_count(topic, 201, partitions=(partition,), - at_least=True) - - def test_switch_leader_async(self): - topic = self.topic - partition = 0 - - # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async_send=True, - batch_send_every_n=15, - batch_send_every_t=3, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, - async_log_messages_on_error=False) - - # Send 10 random messages - self._send_random_messages(producer, topic, partition, 10) - self._send_random_messages(producer, topic, partition + 1, 10) - - # kill leader for partition - self._kill_leader(topic, partition) - - log.debug("attempting to send 'success' message after leader killed") - - # in async mode, this should return immediately - producer.send_messages(topic, partition, b'success') - producer.send_messages(topic, partition + 1, b'success') - - # send to new leader - self._send_random_messages(producer, topic, partition, 10) - self._send_random_messages(producer, topic, partition + 1, 10) - - # Stop the producer and wait for it to shutdown - producer.stop() - started = time.time() - timeout = 60 - while (time.time() - started) < timeout: - if not producer.thread.is_alive(): - break - time.sleep(0.1) - else: - self.fail('timeout waiting for producer queue to empty') - - # count number of messages - # Should be equal to 10 before + 1 recovery + 10 after - # at_least=True because exactly once delivery isn't really a thing - self.assert_message_count(topic, 21, partitions=(partition,), - at_least=True) - self.assert_message_count(topic, 21, partitions=(partition + 1,), - at_least=True) - - def test_switch_leader_keyed_producer(self): - topic = self.topic - - producer = KeyedProducer(self.client, async_send=False) - - # Send 10 random messages - for _ in range(10): - key = random_string(3).encode('utf-8') - msg = random_string(10).encode('utf-8') - producer.send_messages(topic, key, msg) - - # kill leader for partition 0 - self._kill_leader(topic, 0) - - recovered = False - started = time.time() - timeout = 60 - while not recovered and (time.time() - started) < timeout: - try: - key = random_string(3).encode('utf-8') - msg = random_string(10).encode('utf-8') - producer.send_messages(topic, key, msg) - if producer.partitioners[topic].partition(key) == 0: - recovered = True - except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError, - NotLeaderForPartitionError): - log.debug("caught exception sending message -- will retry") - continue - - # Verify we successfully sent the message - self.assertTrue(recovered) - - # send some more messages just to make sure no more exceptions - for _ in range(10): - key = random_string(3).encode('utf-8') - msg = random_string(10).encode('utf-8') - producer.send_messages(topic, key, msg) - - def test_switch_leader_simple_consumer(self): - producer = Producer(self.client, async_send=False) - consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) - self._send_random_messages(producer, self.topic, 0, 2) - consumer.get_messages() - self._kill_leader(self.topic, 0) - consumer.get_messages() - - def _send_random_messages(self, producer, topic, partition, n): - for j in range(n): - msg = 'msg {0}: {1}'.format(j, random_string(10)) - log.debug('_send_random_message %s to %s:%d', msg, topic, partition) - while True: - try: - producer.send_messages(topic, partition, msg.encode('utf-8')) - except Exception: - log.exception('failure in _send_random_messages - retrying') - continue - else: - break - - def _kill_leader(self, topic, partition): - leader = self.client.topics_to_brokers[TopicPartition(topic, partition)] - broker = self.brokers[leader.nodeId] - broker.close() - return broker - - def assert_message_count(self, topic, check_count, timeout=10, - partitions=None, at_least=False): - hosts = ','.join(['%s:%d' % (broker.host, broker.port) - for broker in self.brokers]) - - client = SimpleClient(hosts, timeout=2) - consumer = SimpleConsumer(client, None, topic, - partitions=partitions, - auto_commit=False, - iter_timeout=timeout) - - started_at = time.time() - pending = -1 - while pending < check_count and (time.time() - started_at < timeout): - try: - pending = consumer.pending(partitions) - except FailedPayloadsError: - pass - time.sleep(0.5) - - consumer.stop() - client.close() - - if pending < check_count: - self.fail('Too few pending messages: found %d, expected %d' % - (pending, check_count)) - elif pending > check_count and not at_least: - self.fail('Too many pending messages: found %d, expected %d' % - (pending, check_count)) - return True diff --git a/test/test_package.py b/test/test_package.py index e520f3f..aa42c9c 100644 --- a/test/test_package.py +++ b/test/test_package.py @@ -6,20 +6,20 @@ class TestPackage: assert kafka1.codec.__name__ == "kafka.codec" def test_submodule_namespace(self): - import kafka.client as client1 - assert client1.__name__ == "kafka.client" + import kafka.client_async as client1 + assert client1.__name__ == "kafka.client_async" - from kafka import client as client2 - assert client2.__name__ == "kafka.client" + from kafka import client_async as client2 + assert client2.__name__ == "kafka.client_async" - from kafka.client import SimpleClient as SimpleClient1 - assert SimpleClient1.__name__ == "SimpleClient" + from kafka.client_async import KafkaClient as KafkaClient1 + assert KafkaClient1.__name__ == "KafkaClient" + + from kafka import KafkaClient as KafkaClient2 + assert KafkaClient2.__name__ == "KafkaClient" from kafka.codec import gzip_encode as gzip_encode1 assert gzip_encode1.__name__ == "gzip_encode" - from kafka import SimpleClient as SimpleClient2 - assert SimpleClient2.__name__ == "SimpleClient" - from kafka.codec import snappy_encode assert snappy_encode.__name__ == "snappy_encode" diff --git a/test/test_partitioner.py b/test/test_partitioner.py index 3a5264b..853fbf6 100644 --- a/test/test_partitioner.py +++ b/test/test_partitioner.py @@ -2,8 +2,7 @@ from __future__ import absolute_import import pytest -from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner -from kafka.partitioner.hashed import murmur2 +from kafka.partitioner import DefaultPartitioner, murmur2 def test_default_partitioner(): @@ -22,45 +21,15 @@ def test_default_partitioner(): assert partitioner(None, all_partitions, []) in all_partitions -def test_roundrobin_partitioner(): - partitioner = RoundRobinPartitioner() - all_partitions = available = list(range(100)) - # partitioner should cycle between partitions - i = 0 - max_partition = all_partitions[len(all_partitions) - 1] - while i <= max_partition: - assert i == partitioner(None, all_partitions, available) - i += 1 - - i = 0 - while i <= int(max_partition / 2): - assert i == partitioner(None, all_partitions, available) - i += 1 - - # test dynamic partition re-assignment - available = available[:-25] - - while i <= max(available): - assert i == partitioner(None, all_partitions, available) - i += 1 - - all_partitions = list(range(200)) - available = all_partitions - - max_partition = all_partitions[len(all_partitions) - 1] - while i <= max_partition: - assert i == partitioner(None, all_partitions, available) - i += 1 - - @pytest.mark.parametrize("bytes_payload,partition_number", [ (b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566), (b'\x00 ', 742) ]) def test_murmur2_java_compatibility(bytes_payload, partition_number): - p = Murmur2Partitioner(range(1000)) + partitioner = DefaultPartitioner() + all_partitions = available = list(range(1000)) # compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner - assert p.partition(bytes_payload) == partition_number + assert partitioner(bytes_payload, all_partitions, available) == partition_number def test_murmur2_not_ascii(): diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py deleted file mode 100644 index 8f32cf8..0000000 --- a/test/test_producer_integration.py +++ /dev/null @@ -1,529 +0,0 @@ -import os -import time -import uuid - -import pytest -from kafka.vendor.six.moves import range - -from kafka import ( - SimpleProducer, KeyedProducer, - create_message, create_gzip_message, create_snappy_message, - RoundRobinPartitioner, HashedPartitioner -) -from kafka.codec import has_snappy -from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError -from kafka.producer.base import Producer -from kafka.protocol.message import PartialMessage -from kafka.structs import FetchRequestPayload, ProduceRequestPayload - -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset - - -# TODO: This duplicates a TestKafkaProducerIntegration method temporarily -# while the migration to pytest is in progress -def assert_produce_request(client, topic, messages, initial_offset, message_ct, - partition=0): - """Verify the correctness of a produce request - """ - produce = ProduceRequestPayload(topic, partition, messages=messages) - - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - resp = client.send_produce_request([produce]) - assert_produce_response(resp, initial_offset) - - assert current_offset(client, topic, partition) == initial_offset + message_ct - - -def assert_produce_response(resp, initial_offset): - """Verify that a produce response is well-formed - """ - assert len(resp) == 1 - assert resp[0].error == 0 - assert resp[0].offset == initial_offset - - -@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -def test_produce_many_simple(simple_client, topic): - """Test multiple produces using the SimpleClient - """ - start_offset = current_offset(simple_client, topic, 0) - - assert_produce_request( - simple_client, topic, - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset, - 100, - ) - - assert_produce_request( - simple_client, topic, - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(100)], - start_offset+100, - 100, - ) - - -class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - - @classmethod - def setUpClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk) - - @classmethod - def tearDownClass(cls): # noqa - if not os.environ.get('KAFKA_VERSION'): - return - - cls.server.close() - cls.zk.close() - - def test_produce_10k_simple(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request( - [create_message(("Test message %d" % i).encode('utf-8')) - for i in range(10000)], - start_offset, - 10000, - ) - - def test_produce_many_gzip(self): - start_offset = self.current_offset(self.topic, 0) - - message1 = create_gzip_message([ - (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)]) - message2 = create_gzip_message([ - (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)]) - - self.assert_produce_request( - [ message1, message2 ], - start_offset, - 200, - ) - - def test_produce_many_snappy(self): - self.skipTest("All snappy integration tests fail with nosnappyjava") - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request([ - create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]), - create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]), - ], - start_offset, - 200, - ) - - def test_produce_mixed(self): - start_offset = self.current_offset(self.topic, 0) - - msg_count = 1+100 - messages = [ - create_message(b"Just a plain message"), - create_gzip_message([ - (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]), - ] - - # All snappy integration tests fail with nosnappyjava - if False and has_snappy(): - msg_count += 100 - messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)])) - - self.assert_produce_request(messages, start_offset, msg_count) - - def test_produce_100k_gzipped(self): - start_offset = self.current_offset(self.topic, 0) - - self.assert_produce_request([ - create_gzip_message([ - (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) - for i in range(50000)]) - ], - start_offset, - 50000, - ) - - self.assert_produce_request([ - create_gzip_message([ - (("Gzipped batch 1, message %d" % i).encode('utf-8'), None) - for i in range(50000)]) - ], - start_offset+50000, - 50000, - ) - - ############################ - # SimpleProducer Tests # - ############################ - - def test_simple_producer_new_topic(self): - producer = SimpleProducer(self.client) - resp = producer.send_messages('new_topic', self.msg('foobar')) - self.assert_produce_response(resp, 0) - producer.stop() - - def test_simple_producer(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = SimpleProducer(self.client, random_start=False) - - # Goes to first partition, randomly. - resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - self.assert_produce_response(resp, start_offsets[0]) - - # Goes to the next partition, randomly. - resp = producer.send_messages(self.topic, self.msg("three")) - self.assert_produce_response(resp, start_offsets[1]) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ]) - - # Goes back to the first partition because there's only two partitions - resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - self.assert_produce_response(resp, start_offsets[0]+2) - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ]) - - producer.stop() - - def test_producer_random_order(self): - producer = SimpleProducer(self.client, random_start=True) - resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - resp2 = producer.send_messages(self.topic, self.msg("three")) - resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - - self.assertEqual(resp1[0].partition, resp3[0].partition) - self.assertNotEqual(resp1[0].partition, resp2[0].partition) - - def test_producer_ordered_start(self): - producer = SimpleProducer(self.client, random_start=False) - resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two")) - resp2 = producer.send_messages(self.topic, self.msg("three")) - resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five")) - - self.assertEqual(resp1[0].partition, 0) - self.assertEqual(resp2[0].partition, 1) - self.assertEqual(resp3[0].partition, 0) - - def test_async_simple_producer(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = SimpleProducer(self.client, async_send=True, random_start=False) - resp = producer.send_messages(self.topic, self.msg("one")) - self.assertEqual(len(resp), 0) - - # flush messages - producer.stop() - - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - - def test_batched_simple_producer__triggers_by_message(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - # Configure batch producer - batch_messages = 5 - batch_interval = 5 - producer = SimpleProducer( - self.client, - async_send=True, - batch_send_every_n=batch_messages, - batch_send_every_t=batch_interval, - random_start=False) - - # Send 4 messages -- should not trigger a batch - resp = producer.send_messages( - self.topic, - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # It hasn't sent yet - self.assert_fetch_offset(partitions[0], start_offsets[0], []) - self.assert_fetch_offset(partitions[1], start_offsets[1], []) - - # send 3 more messages -- should trigger batch on first 5 - resp = producer.send_messages( - self.topic, - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # Wait until producer has pulled all messages from internal queue - # this should signal that the first batch was sent, and the producer - # is now waiting for enough messages to batch again (or a timeout) - timeout = 5 - start = time.time() - while not producer.queue.empty(): - if time.time() - start > timeout: - self.fail('timeout waiting for producer queue to empty') - time.sleep(0.1) - - # send messages groups all *msgs in a single call to the same partition - # so we should see all messages from the first call in one partition - self.assert_fetch_offset(partitions[0], start_offsets[0], [ - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ]) - - # Because we are batching every 5 messages, we should only see one - self.assert_fetch_offset(partitions[1], start_offsets[1], [ - self.msg("five"), - ]) - - producer.stop() - - def test_batched_simple_producer__triggers_by_time(self): - self.skipTest("Flakey test -- should be refactored or removed") - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - batch_interval = 5 - producer = SimpleProducer( - self.client, - async_send=True, - batch_send_every_n=100, - batch_send_every_t=batch_interval, - random_start=False) - - # Send 5 messages and do a fetch - resp = producer.send_messages( - self.topic, - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # It hasn't sent yet - self.assert_fetch_offset(partitions[0], start_offsets[0], []) - self.assert_fetch_offset(partitions[1], start_offsets[1], []) - - resp = producer.send_messages(self.topic, - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ) - - # Batch mode is async. No ack - self.assertEqual(len(resp), 0) - - # Wait the timeout out - time.sleep(batch_interval) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ - self.msg("one"), - self.msg("two"), - self.msg("three"), - self.msg("four"), - ]) - - self.assert_fetch_offset(partitions[1], start_offsets[1], [ - self.msg("five"), - self.msg("six"), - self.msg("seven"), - ]) - - producer.stop() - - - ############################ - # KeyedProducer Tests # - ############################ - - @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") - def test_keyedproducer_null_payload(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - key = "test" - - resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - self.assert_produce_response(resp, start_offsets[0]) - resp = producer.send_messages(self.topic, self.key("key2"), None) - self.assert_produce_response(resp, start_offsets[1]) - resp = producer.send_messages(self.topic, self.key("key3"), None) - self.assert_produce_response(resp, start_offsets[0]+1) - resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) - self.assert_produce_response(resp, start_offsets[1]+1) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ]) - - producer.stop() - - def test_round_robin_partitioner(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner) - resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two")) - resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three")) - resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four")) - - self.assert_produce_response(resp1, start_offsets[0]+0) - self.assert_produce_response(resp2, start_offsets[1]+0) - self.assert_produce_response(resp3, start_offsets[0]+1) - self.assert_produce_response(resp4, start_offsets[1]+1) - - self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ]) - self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ]) - - producer.stop() - - def test_hashed_partitioner(self): - partitions = self.client.get_partition_ids_for_topic(self.topic) - start_offsets = [self.current_offset(self.topic, p) for p in partitions] - - producer = KeyedProducer(self.client, partitioner=HashedPartitioner) - resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one")) - resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two")) - resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three")) - resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four")) - resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five")) - - offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]} - messages = {partitions[0]: [], partitions[1]: []} - - keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]] - resps = [resp1, resp2, resp3, resp4, resp5] - msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]] - - for key, resp, msg in zip(keys, resps, msgs): - k = hash(key) % 2 - partition = partitions[k] - offset = offsets[partition] - self.assert_produce_response(resp, offset) - offsets[partition] += 1 - messages[partition].append(msg) - - self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]]) - self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]]) - - producer.stop() - - def test_async_keyed_producer(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = KeyedProducer(self.client, - partitioner=RoundRobinPartitioner, - async_send=True, - batch_send_every_t=1) - - resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) - self.assertEqual(len(resp), 0) - - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) - - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - ############################ - # Producer ACK Tests # - ############################ - - def test_acks_none(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_NOT_REQUIRED, - ) - resp = producer.send_messages(self.topic, partition, self.msg("one")) - - # No response from produce request with no acks required - self.assertEqual(len(resp), 0) - - # But the message should still have been delivered - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() - - def test_acks_local_write(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_AFTER_LOCAL_WRITE, - ) - resp = producer.send_messages(self.topic, partition, self.msg("one")) - - self.assert_produce_response(resp, start_offset) - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - def test_acks_cluster_commit(self): - partition = self.client.get_partition_ids_for_topic(self.topic)[0] - start_offset = self.current_offset(self.topic, partition) - - producer = Producer( - self.client, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, - ) - - resp = producer.send_messages(self.topic, partition, self.msg("one")) - self.assert_produce_response(resp, start_offset) - self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - - producer.stop() - - def assert_produce_request(self, messages, initial_offset, message_ct, - partition=0): - produce = ProduceRequestPayload(self.topic, partition, messages=messages) - - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - resp = self.client.send_produce_request([ produce ]) - self.assert_produce_response(resp, initial_offset) - - self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct) - - def assert_produce_response(self, resp, initial_offset): - self.assertEqual(len(resp), 1) - self.assertEqual(resp[0].error, 0) - self.assertEqual(resp[0].offset, initial_offset) - - def assert_fetch_offset(self, partition, start_offset, expected_messages): - # There should only be one response message from the server. - # This will throw an exception if there's more than one. - - resp, = self.client.send_fetch_request([FetchRequestPayload(self.topic, partition, start_offset, 1024)]) - - self.assertEqual(resp.error, 0) - self.assertEqual(resp.partition, partition) - messages = [ x.message.value for x in resp.messages - if not isinstance(x.message, PartialMessage) ] - - self.assertEqual(messages, expected_messages) - self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages)) diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py deleted file mode 100644 index ab80ee7..0000000 --- a/test/test_producer_legacy.py +++ /dev/null @@ -1,257 +0,0 @@ -# -*- coding: utf-8 -*- - -import collections -import logging -import threading -import time - -from mock import MagicMock, patch -from . import unittest - -from kafka import SimpleClient, SimpleProducer, KeyedProducer -from kafka.errors import ( - AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError) -from kafka.producer.base import Producer, _send_upstream -from kafka.protocol import CODEC_NONE -from kafka.structs import ( - ProduceResponsePayload, RetryOptions, TopicPartition) - -from kafka.vendor.six.moves import queue, range - - -class TestKafkaProducer(unittest.TestCase): - def test_producer_message_types(self): - - producer = Producer(MagicMock()) - topic = b"test-topic" - partition = 0 - - bad_data_types = (u'ä½ æ€Žä¹ˆæ ·?', 12, ['a', 'list'], - ('a', 'tuple'), {'a': 'dict'}, None,) - for m in bad_data_types: - with self.assertRaises(TypeError): - logging.debug("attempting to send message of type %s", type(m)) - producer.send_messages(topic, partition, m) - - good_data_types = (b'a string!',) - for m in good_data_types: - # This should not raise an exception - producer.send_messages(topic, partition, m) - - def test_keyedproducer_message_types(self): - client = MagicMock() - client.get_partition_ids_for_topic.return_value = [0, 1] - producer = KeyedProducer(client) - topic = b"test-topic" - key = b"testkey" - - bad_data_types = (u'ä½ æ€Žä¹ˆæ ·?', 12, ['a', 'list'], - ('a', 'tuple'), {'a': 'dict'},) - for m in bad_data_types: - with self.assertRaises(TypeError): - logging.debug("attempting to send message of type %s", type(m)) - producer.send_messages(topic, key, m) - - good_data_types = (b'a string!', None,) - for m in good_data_types: - # This should not raise an exception - producer.send_messages(topic, key, m) - - def test_topic_message_types(self): - client = MagicMock() - - def partitions(topic): - return [0, 1] - - client.get_partition_ids_for_topic = partitions - - producer = SimpleProducer(client, random_start=False) - topic = b"test-topic" - producer.send_messages(topic, b'hi') - assert client.send_produce_request.called - - @patch('kafka.producer.base._send_upstream') - def test_producer_async_queue_overfilled(self, mock): - queue_size = 2 - producer = Producer(MagicMock(), async_send=True, - async_queue_maxsize=queue_size) - - topic = b'test-topic' - partition = 0 - message = b'test-message' - - with self.assertRaises(AsyncProducerQueueFull): - message_list = [message] * (queue_size + 1) - producer.send_messages(topic, partition, *message_list) - self.assertEqual(producer.queue.qsize(), queue_size) - for _ in range(producer.queue.qsize()): - producer.queue.get() - - def test_producer_sync_fail_on_error(self): - error = FailedPayloadsError('failure') - with patch.object(SimpleClient, 'load_metadata_for_topics'): - with patch.object(SimpleClient, 'ensure_topic_exists'): - with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]): - with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): - - client = SimpleClient(MagicMock()) - producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False) - - # This should not raise - (response,) = producer.send_messages('foobar', b'test message') - self.assertEqual(response, error) - - producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True) - with self.assertRaises(FailedPayloadsError): - producer.send_messages('foobar', b'test message') - - def test_cleanup_is_not_called_on_stopped_producer(self): - producer = Producer(MagicMock(), async_send=True) - producer.stopped = True - with patch.object(producer, 'stop') as mocked_stop: - producer._cleanup_func(producer) - self.assertEqual(mocked_stop.call_count, 0) - - def test_cleanup_is_called_on_running_producer(self): - producer = Producer(MagicMock(), async_send=True) - producer.stopped = False - with patch.object(producer, 'stop') as mocked_stop: - producer._cleanup_func(producer) - self.assertEqual(mocked_stop.call_count, 1) - - -class TestKafkaProducerSendUpstream(unittest.TestCase): - - def setUp(self): - self.client = MagicMock() - self.queue = queue.Queue() - - def _run_process(self, retries_limit=3, sleep_timeout=1): - # run _send_upstream process with the queue - stop_event = threading.Event() - retry_options = RetryOptions(limit=retries_limit, - backoff_ms=50, - retry_on_timeouts=False) - self.thread = threading.Thread( - target=_send_upstream, - args=(self.queue, self.client, CODEC_NONE, - 0.3, # batch time (seconds) - 3, # batch length - Producer.ACK_AFTER_LOCAL_WRITE, - Producer.DEFAULT_ACK_TIMEOUT, - retry_options, - stop_event)) - self.thread.daemon = True - self.thread.start() - time.sleep(sleep_timeout) - stop_event.set() - - def test_wo_retries(self): - - # lets create a queue and add 10 messages for 1 partition - for i in range(10): - self.queue.put((TopicPartition("test", 0), "msg %i", "key %i")) - - self._run_process() - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 4 non-void cals: - # 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.client.send_produce_request.call_count, 4) - - def test_first_send_failed(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) - - # Mock offsets counter for closure - offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) - self.client.is_first_time = True - def send_side_effect(reqs, *args, **kwargs): - if self.client.is_first_time: - self.client.is_first_time = False - return [FailedPayloadsError(req) for req in reqs] - responses = [] - for req in reqs: - offset = offsets[req.topic][req.partition] - offsets[req.topic][req.partition] += len(req.messages) - responses.append( - ProduceResponsePayload(req.topic, req.partition, 0, offset) - ) - return responses - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(2) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 5 non-void calls: 1st failed batch of 3 msgs - # plus 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.client.send_produce_request.call_count, 5) - - def test_with_limited_retries(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i)) - - def send_side_effect(reqs, *args, **kwargs): - return [FailedPayloadsError(req) for req in reqs] - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(3, 3) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 16 non-void calls: - # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + - # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16 - self.assertEqual(self.client.send_produce_request.call_count, 16) - - def test_async_producer_not_leader(self): - - for i in range(10): - self.queue.put((TopicPartition("test", i), "msg %i", "key %i")) - - # Mock offsets counter for closure - offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0)) - self.client.is_first_time = True - def send_side_effect(reqs, *args, **kwargs): - if self.client.is_first_time: - self.client.is_first_time = False - return [ProduceResponsePayload(req.topic, req.partition, - NotLeaderForPartitionError.errno, -1) - for req in reqs] - - responses = [] - for req in reqs: - offset = offsets[req.topic][req.partition] - offsets[req.topic][req.partition] += len(req.messages) - responses.append( - ProduceResponsePayload(req.topic, req.partition, 0, offset) - ) - return responses - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(2) - - # the queue should be void at the end of the test - self.assertEqual(self.queue.empty(), True) - - # there should be 5 non-void calls: 1st failed batch of 3 msgs - # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 - self.assertEqual(self.client.send_produce_request.call_count, 5) - - def tearDown(self): - for _ in range(self.queue.qsize()): - self.queue.get() diff --git a/test/test_protocol_legacy.py b/test/test_protocol_legacy.py deleted file mode 100644 index 1341af0..0000000 --- a/test/test_protocol_legacy.py +++ /dev/null @@ -1,848 +0,0 @@ -#pylint: skip-file -from contextlib import contextmanager -import struct - -from kafka.vendor import six -from mock import patch, sentinel -from . import unittest - -from kafka.codec import has_snappy, gzip_decode, snappy_decode -from kafka.errors import ( - ChecksumError, KafkaUnavailableError, UnsupportedCodecError, - ConsumerFetchSizeTooSmall, ProtocolError) -from kafka.protocol import ( - ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol, - create_message, create_gzip_message, create_snappy_message, - create_message_set) -from kafka.structs import ( - OffsetRequestPayload, OffsetResponsePayload, - OffsetCommitRequestPayload, OffsetCommitResponsePayload, - OffsetFetchRequestPayload, OffsetFetchResponsePayload, - ProduceRequestPayload, ProduceResponsePayload, - FetchRequestPayload, FetchResponsePayload, - Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse) - - -class TestProtocol(unittest.TestCase): - def test_create_message(self): - payload = "test" - key = "key" - msg = create_message(payload, key) - self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, 0) - self.assertEqual(msg.key, key) - self.assertEqual(msg.value, payload) - - def test_create_gzip(self): - payloads = [(b"v1", None), (b"v2", None)] - msg = create_gzip_message(payloads) - self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) - self.assertEqual(msg.key, None) - # Need to decode to check since gzipped payload is non-deterministic - decoded = gzip_decode(msg.value) - expect = b"".join([ - struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1285512130), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", -1), # -1 indicates a null key - struct.pack(">i", 2), # Msg length (bytes) - b"v1", # Message contents - - struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", -711587208), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", -1), # -1 indicates a null key - struct.pack(">i", 2), # Msg length (bytes) - b"v2", # Message contents - ]) - - self.assertEqual(decoded, expect) - - def test_create_gzip_keyed(self): - payloads = [(b"v1", b"k1"), (b"v2", b"k2")] - msg = create_gzip_message(payloads) - self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP) - self.assertEqual(msg.key, None) - # Need to decode to check since gzipped payload is non-deterministic - decoded = gzip_decode(msg.value) - expect = b"".join([ - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k1", # Key - struct.pack(">i", 2), # Length of value - b"v1", # Value - - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k2", # Key - struct.pack(">i", 2), # Length of value - b"v2", # Value - ]) - - self.assertEqual(decoded, expect) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_create_snappy(self): - payloads = [(b"v1", None), (b"v2", None)] - msg = create_snappy_message(payloads) - self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) - self.assertEqual(msg.key, None) - decoded = snappy_decode(msg.value) - expect = b"".join([ - struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", 1285512130), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", -1), # -1 indicates a null key - struct.pack(">i", 2), # Msg length (bytes) - b"v1", # Message contents - - struct.pack(">q", 0), # MsgSet offset - struct.pack(">i", 16), # MsgSet size - struct.pack(">i", -711587208), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", -1), # -1 indicates a null key - struct.pack(">i", 2), # Msg length (bytes) - b"v2", # Message contents - ]) - - self.assertEqual(decoded, expect) - - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_create_snappy_keyed(self): - payloads = [(b"v1", b"k1"), (b"v2", b"k2")] - msg = create_snappy_message(payloads) - self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY) - self.assertEqual(msg.key, None) - decoded = snappy_decode(msg.value) - expect = b"".join([ - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k1", # Key - struct.pack(">i", 2), # Length of value - b"v1", # Value - - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k2", # Key - struct.pack(">i", 2), # Length of value - b"v2", # Value - ]) - - self.assertEqual(decoded, expect) - - def test_encode_message_header(self): - expect = b"".join([ - struct.pack(">h", 10), # API Key - struct.pack(">h", 0), # API Version - struct.pack(">i", 4), # Correlation Id - struct.pack(">h", len("client3")), # Length of clientId - b"client3", # ClientId - ]) - - encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10) - self.assertEqual(encoded, expect) - - def test_encode_message(self): - message = create_message(b"test", b"key") - encoded = KafkaProtocol._encode_message(message) - expect = b"".join([ - struct.pack(">i", -1427009701), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 3), # Length of key - b"key", # key - struct.pack(">i", 4), # Length of value - b"test", # value - ]) - - self.assertEqual(encoded, expect) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_message(self): - encoded = b"".join([ - struct.pack(">i", -1427009701), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 3), # Length of key - b"key", # key - struct.pack(">i", 4), # Length of value - b"test", # value - ]) - - offset = 10 - (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0] - - self.assertEqual(returned_offset, offset) - self.assertEqual(decoded_message, create_message(b"test", b"key")) - - def test_encode_message_failure(self): - with self.assertRaises(ProtocolError): - KafkaProtocol._encode_message(Message(1, 0, "key", "test")) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_message_set(self): - message_set = [ - create_message(b"v1", b"k1"), - create_message(b"v2", b"k2") - ] - - encoded = KafkaProtocol._encode_message_set(message_set) - expect = b"".join([ - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k1", # Key - struct.pack(">i", 2), # Length of value - b"v1", # Value - - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k2", # Key - struct.pack(">i", 2), # Length of value - b"v2", # Value - ]) - - self.assertEqual(encoded, expect) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_message_set(self): - encoded = b"".join([ - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k1", # Key - struct.pack(">i", 2), # Length of value - b"v1", # Value - - struct.pack(">q", 1), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k2", # Key - struct.pack(">i", 2), # Length of value - b"v2", # Value - ]) - - msgs = list(KafkaProtocol._decode_message_set_iter(encoded)) - self.assertEqual(len(msgs), 2) - msg1, msg2 = msgs - - returned_offset1, decoded_message1 = msg1 - returned_offset2, decoded_message2 = msg2 - - self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1", b"k1")) - - self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2, create_message(b"v2", b"k2")) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_message_gzip(self): - gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000' - b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01' - b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8' - b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00' - b'\x00') - offset = 11 - messages = list(KafkaProtocol._decode_message(gzip_encoded, offset)) - - self.assertEqual(len(messages), 2) - msg1, msg2 = messages - - returned_offset1, decoded_message1 = msg1 - self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) - - returned_offset2, decoded_message2 = msg2 - self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) - - @unittest.skip('needs updating for new protocol classes') - @unittest.skipUnless(has_snappy(), "Snappy not available") - def test_decode_message_snappy(self): - snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00' - b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff' - b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5' - b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2') - offset = 11 - messages = list(KafkaProtocol._decode_message(snappy_encoded, offset)) - self.assertEqual(len(messages), 2) - - msg1, msg2 = messages - - returned_offset1, decoded_message1 = msg1 - self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1, create_message(b"v1")) - - returned_offset2, decoded_message2 = msg2 - self.assertEqual(returned_offset2, 0) - self.assertEqual(decoded_message2, create_message(b"v2")) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_message_checksum_error(self): - invalid_encoded_message = b"This is not a valid encoded message" - iter = KafkaProtocol._decode_message(invalid_encoded_message, 0) - self.assertRaises(ChecksumError, list, iter) - - # NOTE: The error handling in _decode_message_set_iter() is questionable. - # If it's modified, the next two tests might need to be fixed. - @unittest.skip('needs updating for new protocol classes') - def test_decode_message_set_fetch_size_too_small(self): - with self.assertRaises(ConsumerFetchSizeTooSmall): - list(KafkaProtocol._decode_message_set_iter('a')) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_message_set_stop_iteration(self): - encoded = b"".join([ - struct.pack(">q", 0), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", 1474775406), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k1", # Key - struct.pack(">i", 2), # Length of value - b"v1", # Value - - struct.pack(">q", 1), # MsgSet Offset - struct.pack(">i", 18), # Msg Size - struct.pack(">i", -16383415), # CRC - struct.pack(">bb", 0, 0), # Magic, flags - struct.pack(">i", 2), # Length of key - b"k2", # Key - struct.pack(">i", 2), # Length of value - b"v2", # Value - b"@1$%(Y!", # Random padding - ]) - - msgs = MessageSet.decode(io.BytesIO(encoded)) - self.assertEqual(len(msgs), 2) - msg1, msg2 = msgs - - returned_offset1, msg_size1, decoded_message1 = msg1 - returned_offset2, msg_size2, decoded_message2 = msg2 - - self.assertEqual(returned_offset1, 0) - self.assertEqual(decoded_message1.value, b"v1") - self.assertEqual(decoded_message1.key, b"k1") - - self.assertEqual(returned_offset2, 1) - self.assertEqual(decoded_message2.value, b"v2") - self.assertEqual(decoded_message2.key, b"k2") - - @unittest.skip('needs updating for new protocol classes') - def test_encode_produce_request(self): - requests = [ - ProduceRequestPayload("topic1", 0, [ - kafka.protocol.message.Message(b"a"), - kafka.protocol.message.Message(b"b") - ]), - ProduceRequestPayload("topic2", 1, [ - kafka.protocol.message.Message(b"c") - ]) - ] - - msg_a_binary = KafkaProtocol._encode_message(create_message(b"a")) - msg_b_binary = KafkaProtocol._encode_message(create_message(b"b")) - msg_c_binary = KafkaProtocol._encode_message(create_message(b"c")) - - header = b"".join([ - struct.pack('>i', 0x94), # The length of the message overall - struct.pack('>h', 0), # Msg Header, Message type = Produce - struct.pack('>h', 0), # Msg Header, API version - struct.pack('>i', 2), # Msg Header, Correlation ID - struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID - struct.pack('>h', 2), # Num acks required - struct.pack('>i', 100), # Request Timeout - struct.pack('>i', 2), # The number of requests - ]) - - total_len = len(msg_a_binary) + len(msg_b_binary) - topic1 = b"".join([ - struct.pack('>h6s', 6, b'topic1'), # The topic1 - struct.pack('>i', 1), # One message set - struct.pack('>i', 0), # Partition 0 - struct.pack('>i', total_len + 24), # Size of the incoming message set - struct.pack('>q', 0), # No offset specified - struct.pack('>i', len(msg_a_binary)), # Length of message - msg_a_binary, # Actual message - struct.pack('>q', 0), # No offset specified - struct.pack('>i', len(msg_b_binary)), # Length of message - msg_b_binary, # Actual message - ]) - - topic2 = b"".join([ - struct.pack('>h6s', 6, b'topic2'), # The topic1 - struct.pack('>i', 1), # One message set - struct.pack('>i', 1), # Partition 1 - struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set - struct.pack('>q', 0), # No offset specified - struct.pack('>i', len(msg_c_binary)), # Length of message - msg_c_binary, # Actual message - ]) - - expected1 = b"".join([ header, topic1, topic2 ]) - expected2 = b"".join([ header, topic2, topic1 ]) - - encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100) - self.assertIn(encoded, [ expected1, expected2 ]) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_produce_response(self): - t1 = b"topic1" - t2 = b"topic2" - _long = int - if six.PY2: - _long = long - encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)), - 2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20), - len(t2), t2, 1, 0, 0, _long(30)) - responses = list(KafkaProtocol.decode_produce_response(encoded)) - self.assertEqual(responses, - [ProduceResponse(t1, 0, 0, _long(10)), - ProduceResponse(t1, 1, 1, _long(20)), - ProduceResponse(t2, 0, 0, _long(30))]) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_fetch_request(self): - requests = [ - FetchRequest(b"topic1", 0, 10, 1024), - FetchRequest(b"topic2", 1, 20, 100), - ] - - header = b"".join([ - struct.pack('>i', 89), # The length of the message overall - struct.pack('>h', 1), # Msg Header, Message type = Fetch - struct.pack('>h', 0), # Msg Header, API version - struct.pack('>i', 3), # Msg Header, Correlation ID - struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID - struct.pack('>i', -1), # Replica Id - struct.pack('>i', 2), # Max wait time - struct.pack('>i', 100), # Min bytes - struct.pack('>i', 2), # Num requests - ]) - - topic1 = b"".join([ - struct.pack('>h6s', 6, b'topic1'),# Topic - struct.pack('>i', 1), # Num Payloads - struct.pack('>i', 0), # Partition 0 - struct.pack('>q', 10), # Offset - struct.pack('>i', 1024), # Max Bytes - ]) - - topic2 = b"".join([ - struct.pack('>h6s', 6, b'topic2'),# Topic - struct.pack('>i', 1), # Num Payloads - struct.pack('>i', 1), # Partition 0 - struct.pack('>q', 20), # Offset - struct.pack('>i', 100), # Max Bytes - ]) - - expected1 = b"".join([ header, topic1, topic2 ]) - expected2 = b"".join([ header, topic2, topic1 ]) - - encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100) - self.assertIn(encoded, [ expected1, expected2 ]) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_fetch_response(self): - t1 = b"topic1" - t2 = b"topic2" - msgs = [create_message(msg) - for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]] - ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]]) - ms2 = KafkaProtocol._encode_message_set([msgs[2]]) - ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]]) - - encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' % - (len(t1), len(ms1), len(ms2), len(t2), len(ms3)), - 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1, - 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30, - len(ms3), ms3) - - responses = list(KafkaProtocol.decode_fetch_response(encoded)) - def expand_messages(response): - return FetchResponsePayload(response.topic, response.partition, - response.error, response.highwaterMark, - list(response.messages)) - - expanded_responses = list(map(expand_messages, responses)) - expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]), - OffsetAndMessage(0, msgs[1])]), - FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]), - FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]), - OffsetAndMessage(0, msgs[4])])] - self.assertEqual(expanded_responses, expect) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_metadata_request_no_topics(self): - expected = b"".join([ - struct.pack(">i", 17), # Total length of the request - struct.pack('>h', 3), # API key metadata fetch - struct.pack('>h', 0), # API version - struct.pack('>i', 4), # Correlation ID - struct.pack('>h3s', 3, b"cid"),# The client ID - struct.pack('>i', 0), # No topics, give all the data! - ]) - - encoded = KafkaProtocol.encode_metadata_request(b"cid", 4) - - self.assertEqual(encoded, expected) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_metadata_request_with_topics(self): - expected = b"".join([ - struct.pack(">i", 25), # Total length of the request - struct.pack('>h', 3), # API key metadata fetch - struct.pack('>h', 0), # API version - struct.pack('>i', 4), # Correlation ID - struct.pack('>h3s', 3, b"cid"),# The client ID - struct.pack('>i', 2), # Number of topics in the request - struct.pack('>h2s', 2, b"t1"), # Topic "t1" - struct.pack('>h2s', 2, b"t2"), # Topic "t2" - ]) - - encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"]) - - self.assertEqual(encoded, expected) - - def _create_encoded_metadata_response(self, brokers, topics): - encoded = [] - encoded.append(struct.pack('>ii', 3, len(brokers))) - for broker in brokers: - encoded.append(struct.pack('>ih%dsi' % len(broker.host), - broker.nodeId, len(broker.host), - broker.host, broker.port)) - - encoded.append(struct.pack('>i', len(topics))) - for topic in topics: - encoded.append(struct.pack('>hh%dsi' % len(topic.topic), - topic.error, len(topic.topic), - topic.topic, len(topic.partitions))) - for metadata in topic.partitions: - encoded.append(struct.pack('>hiii', metadata.error, - metadata.partition, metadata.leader, - len(metadata.replicas))) - if len(metadata.replicas) > 0: - encoded.append(struct.pack('>%di' % len(metadata.replicas), - *metadata.replicas)) - - encoded.append(struct.pack('>i', len(metadata.isr))) - if len(metadata.isr) > 0: - encoded.append(struct.pack('>%di' % len(metadata.isr), - *metadata.isr)) - return b''.join(encoded) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_metadata_response(self): - node_brokers = [ - BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000), - BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001), - BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000) - ] - - ''' - topic_partitions = [ - TopicMetadata(b"topic1", 0, [ - PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0), - PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1) - ]), - TopicMetadata(b"topic2", 1, [ - PartitionMetadata(b"topic2", 0, 0, (), (), 0), - ]), - ] - encoded = self._create_encoded_metadata_response(node_brokers, - topic_partitions) - decoded = KafkaProtocol.decode_metadata_response(encoded) - self.assertEqual(decoded, (node_brokers, topic_partitions)) - ''' - - def test_encode_consumer_metadata_request(self): - expected = b"".join([ - struct.pack(">i", 17), # Total length of the request - struct.pack('>h', 10), # API key consumer metadata - struct.pack('>h', 0), # API version - struct.pack('>i', 4), # Correlation ID - struct.pack('>h3s', 3, b"cid"),# The client ID - struct.pack('>h2s', 2, b"g1"), # Group "g1" - ]) - - encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1") - - self.assertEqual(encoded, expected) - - def test_decode_consumer_metadata_response(self): - encoded = b"".join([ - struct.pack(">i", 42), # Correlation ID - struct.pack(">h", 0), # No Error - struct.pack(">i", 1), # Broker ID - struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host - struct.pack(">i", 1000), # Broker Port - ]) - - results = KafkaProtocol.decode_consumer_metadata_response(encoded) - self.assertEqual(results, - ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000) - ) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_offset_request(self): - expected = b"".join([ - struct.pack(">i", 21), # Total length of the request - struct.pack('>h', 2), # Message type = offset fetch - struct.pack('>h', 0), # API version - struct.pack('>i', 4), # Correlation ID - struct.pack('>h3s', 3, b"cid"), # The client ID - struct.pack('>i', -1), # Replica Id - struct.pack('>i', 0), # No topic/partitions - ]) - - encoded = KafkaProtocol.encode_offset_request(b"cid", 4) - - self.assertEqual(encoded, expected) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_offset_request__no_payload(self): - expected = b"".join([ - struct.pack(">i", 65), # Total length of the request - - struct.pack('>h', 2), # Message type = offset fetch - struct.pack('>h', 0), # API version - struct.pack('>i', 4), # Correlation ID - struct.pack('>h3s', 3, b"cid"), # The client ID - struct.pack('>i', -1), # Replica Id - struct.pack('>i', 1), # Num topics - struct.pack(">h6s", 6, b"topic1"),# Topic for the request - struct.pack(">i", 2), # Two partitions - - struct.pack(">i", 3), # Partition 3 - struct.pack(">q", -1), # No time offset - struct.pack(">i", 1), # One offset requested - - struct.pack(">i", 4), # Partition 3 - struct.pack(">q", -1), # No time offset - struct.pack(">i", 1), # One offset requested - ]) - - encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [ - OffsetRequest(b'topic1', 3, -1, 1), - OffsetRequest(b'topic1', 4, -1, 1), - ]) - - self.assertEqual(encoded, expected) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_offset_response(self): - encoded = b"".join([ - struct.pack(">i", 42), # Correlation ID - struct.pack(">i", 1), # One topics - struct.pack(">h6s", 6, b"topic1"),# First topic - struct.pack(">i", 2), # Two partitions - - struct.pack(">i", 2), # Partition 2 - struct.pack(">h", 0), # No error - struct.pack(">i", 1), # One offset - struct.pack(">q", 4), # Offset 4 - - struct.pack(">i", 4), # Partition 4 - struct.pack(">h", 0), # No error - struct.pack(">i", 1), # One offset - struct.pack(">q", 8), # Offset 8 - ]) - - results = KafkaProtocol.decode_offset_response(encoded) - self.assertEqual(set(results), set([ - OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)), - OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)), - ])) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_offset_commit_request(self): - header = b"".join([ - struct.pack('>i', 99), # Total message length - - struct.pack('>h', 8), # Message type = offset commit - struct.pack('>h', 0), # API version - struct.pack('>i', 42), # Correlation ID - struct.pack('>h9s', 9, b"client_id"),# The client ID - struct.pack('>h8s', 8, b"group_id"), # The group to commit for - struct.pack('>i', 2), # Num topics - ]) - - topic1 = b"".join([ - struct.pack(">h6s", 6, b"topic1"), # Topic for the request - struct.pack(">i", 2), # Two partitions - struct.pack(">i", 0), # Partition 0 - struct.pack(">q", 123), # Offset 123 - struct.pack(">h", -1), # Null metadata - struct.pack(">i", 1), # Partition 1 - struct.pack(">q", 234), # Offset 234 - struct.pack(">h", -1), # Null metadata - ]) - - topic2 = b"".join([ - struct.pack(">h6s", 6, b"topic2"), # Topic for the request - struct.pack(">i", 1), # One partition - struct.pack(">i", 2), # Partition 2 - struct.pack(">q", 345), # Offset 345 - struct.pack(">h", -1), # Null metadata - ]) - - expected1 = b"".join([ header, topic1, topic2 ]) - expected2 = b"".join([ header, topic2, topic1 ]) - - encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [ - OffsetCommitRequest(b"topic1", 0, 123, None), - OffsetCommitRequest(b"topic1", 1, 234, None), - OffsetCommitRequest(b"topic2", 2, 345, None), - ]) - - self.assertIn(encoded, [ expected1, expected2 ]) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_offset_commit_response(self): - encoded = b"".join([ - struct.pack(">i", 42), # Correlation ID - struct.pack(">i", 1), # One topic - struct.pack(">h6s", 6, b"topic1"),# First topic - struct.pack(">i", 2), # Two partitions - - struct.pack(">i", 2), # Partition 2 - struct.pack(">h", 0), # No error - - struct.pack(">i", 4), # Partition 4 - struct.pack(">h", 0), # No error - ]) - - results = KafkaProtocol.decode_offset_commit_response(encoded) - self.assertEqual(set(results), set([ - OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0), - OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0), - ])) - - @unittest.skip('needs updating for new protocol classes') - def test_encode_offset_fetch_request(self): - header = b"".join([ - struct.pack('>i', 69), # Total message length - struct.pack('>h', 9), # Message type = offset fetch - struct.pack('>h', 0), # API version - struct.pack('>i', 42), # Correlation ID - struct.pack('>h9s', 9, b"client_id"),# The client ID - struct.pack('>h8s', 8, b"group_id"), # The group to commit for - struct.pack('>i', 2), # Num topics - ]) - - topic1 = b"".join([ - struct.pack(">h6s", 6, b"topic1"), # Topic for the request - struct.pack(">i", 2), # Two partitions - struct.pack(">i", 0), # Partition 0 - struct.pack(">i", 1), # Partition 1 - ]) - - topic2 = b"".join([ - struct.pack(">h6s", 6, b"topic2"), # Topic for the request - struct.pack(">i", 1), # One partitions - struct.pack(">i", 2), # Partition 2 - ]) - - expected1 = b"".join([ header, topic1, topic2 ]) - expected2 = b"".join([ header, topic2, topic1 ]) - - encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [ - OffsetFetchRequest(b"topic1", 0), - OffsetFetchRequest(b"topic1", 1), - OffsetFetchRequest(b"topic2", 2), - ]) - - self.assertIn(encoded, [ expected1, expected2 ]) - - @unittest.skip('needs updating for new protocol classes') - def test_decode_offset_fetch_response(self): - encoded = b"".join([ - struct.pack(">i", 42), # Correlation ID - struct.pack(">i", 1), # One topics - struct.pack(">h6s", 6, b"topic1"),# First topic - struct.pack(">i", 2), # Two partitions - - struct.pack(">i", 2), # Partition 2 - struct.pack(">q", 4), # Offset 4 - struct.pack(">h4s", 4, b"meta"), # Metadata - struct.pack(">h", 0), # No error - - struct.pack(">i", 4), # Partition 4 - struct.pack(">q", 8), # Offset 8 - struct.pack(">h4s", 4, b"meta"), # Metadata - struct.pack(">h", 0), # No error - ]) - - results = KafkaProtocol.decode_offset_fetch_response(encoded) - self.assertEqual(set(results), set([ - OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"), - OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"), - ])) - - @contextmanager - def mock_create_message_fns(self): - import kafka.protocol - with patch.object(kafka.protocol.legacy, "create_message", - return_value=sentinel.message): - with patch.object(kafka.protocol.legacy, "create_gzip_message", - return_value=sentinel.gzip_message): - with patch.object(kafka.protocol.legacy, "create_snappy_message", - return_value=sentinel.snappy_message): - yield - - def test_create_message_set(self): - messages = [(1, "k1"), (2, "k2"), (3, "k3")] - - # Default codec is CODEC_NONE. Expect list of regular messages. - expect = [sentinel.message] * len(messages) - with self.mock_create_message_fns(): - message_set = create_message_set(messages) - self.assertEqual(message_set, expect) - - # CODEC_NONE: Expect list of regular messages. - expect = [sentinel.message] * len(messages) - with self.mock_create_message_fns(): - message_set = create_message_set(messages, CODEC_NONE) - self.assertEqual(message_set, expect) - - # CODEC_GZIP: Expect list of one gzip-encoded message. - expect = [sentinel.gzip_message] - with self.mock_create_message_fns(): - message_set = create_message_set(messages, CODEC_GZIP) - self.assertEqual(message_set, expect) - - # CODEC_SNAPPY: Expect list of one snappy-encoded message. - expect = [sentinel.snappy_message] - with self.mock_create_message_fns(): - message_set = create_message_set(messages, CODEC_SNAPPY) - self.assertEqual(message_set, expect) - - # Unknown codec should raise UnsupportedCodecError. - with self.assertRaises(UnsupportedCodecError): - create_message_set(messages, -1) diff --git a/test/test_util.py b/test/test_util.py deleted file mode 100644 index a4dbaa5..0000000 --- a/test/test_util.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -import struct - -from kafka.vendor import six -from . import unittest - -import kafka.errors -import kafka.structs -import kafka.util - - -class UtilTest(unittest.TestCase): - @unittest.skip("Unwritten") - def test_relative_unpack(self): - pass - - def test_write_int_string(self): - self.assertEqual( - kafka.util.write_int_string(b'some string'), - b'\x00\x00\x00\x0bsome string' - ) - - def test_write_int_string__unicode(self): - with self.assertRaises(TypeError) as cm: - kafka.util.write_int_string(u'unicode') - #: :type: TypeError - te = cm.exception - if six.PY2: - self.assertIn('unicode', str(te)) - else: - self.assertIn('str', str(te)) - self.assertIn('to be bytes', str(te)) - - def test_write_int_string__empty(self): - self.assertEqual( - kafka.util.write_int_string(b''), - b'\x00\x00\x00\x00' - ) - - def test_write_int_string__null(self): - self.assertEqual( - kafka.util.write_int_string(None), - b'\xff\xff\xff\xff' - ) - - def test_read_short_string(self): - self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2)) - self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2)) - self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13)) - - def test_relative_unpack2(self): - self.assertEqual( - kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0), - ((1, 0), 4) - ) - - def test_relative_unpack3(self): - with self.assertRaises(kafka.errors.BufferUnderflowError): - kafka.util.relative_unpack('>hh', '\x00', 0) - - def test_group_by_topic_and_partition(self): - t = kafka.structs.TopicPartition - - l = [ - t("a", 1), - t("a", 2), - t("a", 3), - t("b", 3), - ] - - self.assertEqual(kafka.util.group_by_topic_and_partition(l), { - "a": { - 1: t("a", 1), - 2: t("a", 2), - 3: t("a", 3), - }, - "b": { - 3: t("b", 3), - } - }) - - # should not be able to group duplicate topic-partitions - t1 = t("a", 1) - with self.assertRaises(AssertionError): - kafka.util.group_by_topic_and_partition([t1, t1]) diff --git a/test/testutil.py b/test/testutil.py index 650f9bf..77a6673 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -4,18 +4,6 @@ import os import random import string import time -import uuid - -import pytest -from . import unittest - -from kafka import SimpleClient -from kafka.errors import ( - LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError, - NotLeaderForPartitionError, UnknownTopicOrPartitionError, - FailedPayloadsError -) -from kafka.structs import OffsetRequestPayload def random_string(length): @@ -32,21 +20,6 @@ def env_kafka_version(): return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) -def current_offset(client, topic, partition, kafka_broker=None): - """Get the current offset of a topic's partition - """ - try: - offsets, = client.send_offset_request([OffsetRequestPayload(topic, - partition, -1, 1)]) - except Exception: - # XXX: We've seen some UnknownErrors here and can't debug w/o server logs - if kafka_broker: - kafka_broker.dump_logs() - raise - else: - return offsets.offsets[0] - - def assert_message_count(messages, num_messages): """Check that we received the expected number of messages with no duplicates.""" # Make sure we got them all @@ -58,84 +31,6 @@ def assert_message_count(messages, num_messages): assert len(unique_messages) == num_messages -class KafkaIntegrationTestCase(unittest.TestCase): - create_client = True - topic = None - zk = None - server = None - - def setUp(self): - super(KafkaIntegrationTestCase, self).setUp() - if not os.environ.get('KAFKA_VERSION'): - self.skipTest('Integration test requires KAFKA_VERSION') - - if not self.topic: - topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) - self.topic = topic - - if self.create_client: - self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port)) - - timeout = time.time() + 30 - while time.time() < timeout: - try: - self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False) - if self.client.has_metadata_for_topic(topic): - break - except (LeaderNotAvailableError, InvalidTopicError): - time.sleep(1) - else: - raise KafkaTimeoutError('Timeout loading topic metadata!') - - - # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors - # TODO: It might be a good idea to move this to self.client.ensure_topic_exists - for partition in self.client.get_partition_ids_for_topic(self.topic): - while True: - try: - req = OffsetRequestPayload(self.topic, partition, -1, 100) - self.client.send_offset_request([req]) - break - except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e: - if time.time() > timeout: - raise KafkaTimeoutError('Timeout loading topic metadata!') - time.sleep(.1) - - self._messages = {} - - def tearDown(self): - super(KafkaIntegrationTestCase, self).tearDown() - if not os.environ.get('KAFKA_VERSION'): - return - - if self.create_client: - self.client.close() - - def current_offset(self, topic, partition): - try: - offsets, = self.client.send_offset_request([OffsetRequestPayload(topic, - partition, -1, 1)]) - except Exception: - # XXX: We've seen some UnknownErrors here and can't debug w/o server logs - self.zk.child.dump_logs() - self.server.child.dump_logs() - raise - else: - return offsets.offsets[0] - - def msgs(self, iterable): - return [self.msg(x) for x in iterable] - - def msg(self, s): - if s not in self._messages: - self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4())) - - return self._messages[s].encode('utf-8') - - def key(self, k): - return k.encode('utf-8') - - class Timer(object): def __enter__(self): self.start = time.time() |