diff options
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r-- | test/test_producer_integration.py | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 19d3a6d..125df2c 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -7,16 +7,16 @@ from kafka import ( create_message, create_gzip_message, create_snappy_message, RoundRobinPartitioner, HashedPartitioner ) +from kafka.codec import has_snappy from kafka.common import ( FetchRequest, ProduceRequest, UnknownTopicOrPartitionError ) -from kafka.codec import has_snappy from test.fixtures import ZookeeperFixture, KafkaFixture from test.testutil import KafkaIntegrationTestCase, kafka_versions class TestKafkaProducerIntegration(KafkaIntegrationTestCase): - topic = 'produce_topic' + topic = b'produce_topic' @classmethod def setUpClass(cls): # noqa @@ -39,13 +39,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(100) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], start_offset, 100, ) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(100) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(100)], start_offset+100, 100, ) @@ -55,7 +57,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request( - [ create_message("Test message %d" % i) for i in range(10000) ], + [create_message(("Test message %d" % i).encode('utf-8')) + for i in range(10000)], start_offset, 10000, ) @@ -64,8 +67,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): def test_produce_many_gzip(self): start_offset = self.current_offset(self.topic, 0) - message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)]) - message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)]) + message1 = create_gzip_message([ + ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)]) + message2 = create_gzip_message([ + ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)]) self.assert_produce_request( [ message1, message2 ], @@ -92,8 +97,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): msg_count = 1+100 messages = [ - create_message("Just a plain message"), - create_gzip_message(["Gzipped %d" % i for i in range(100)]), + create_message(b"Just a plain message"), + create_gzip_message([ + ("Gzipped %d" % i).encode('utf-8') for i in range(100)]), ] # All snappy integration tests fail with nosnappyjava @@ -108,14 +114,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): start_offset = self.current_offset(self.topic, 0) self.assert_produce_request([ - create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + create_gzip_message([ + ("Gzipped batch 1, message %d" % i).encode('utf-8') + for i in range(50000)]) ], start_offset, 50000, ) self.assert_produce_request([ - create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)]) + create_gzip_message([ + ("Gzipped batch 1, message %d" % i).encode('utf-8') + for i in range(50000)]) ], start_offset+50000, 50000, @@ -151,7 +161,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): @kafka_versions("all") def test_produce__new_topic_fails_with_reasonable_error(self): - new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())) + new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8') producer = SimpleProducer(self.client) # At first it doesn't exist |