summaryrefslogtreecommitdiff
path: root/test/test_producer_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_producer_integration.py')
-rw-r--r--test/test_producer_integration.py34
1 files changed, 22 insertions, 12 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
index 19d3a6d..125df2c 100644
--- a/test/test_producer_integration.py
+++ b/test/test_producer_integration.py
@@ -7,16 +7,16 @@ from kafka import (
create_message, create_gzip_message, create_snappy_message,
RoundRobinPartitioner, HashedPartitioner
)
+from kafka.codec import has_snappy
from kafka.common import (
FetchRequest, ProduceRequest, UnknownTopicOrPartitionError
)
-from kafka.codec import has_snappy
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
- topic = 'produce_topic'
+ topic = b'produce_topic'
@classmethod
def setUpClass(cls): # noqa
@@ -39,13 +39,15 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset,
100,
)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(100) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(100)],
start_offset+100,
100,
)
@@ -55,7 +57,8 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request(
- [ create_message("Test message %d" % i) for i in range(10000) ],
+ [create_message(("Test message %d" % i).encode('utf-8'))
+ for i in range(10000)],
start_offset,
10000,
)
@@ -64,8 +67,10 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)
- message1 = create_gzip_message(["Gzipped 1 %d" % i for i in range(100)])
- message2 = create_gzip_message(["Gzipped 2 %d" % i for i in range(100)])
+ message1 = create_gzip_message([
+ ("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
+ message2 = create_gzip_message([
+ ("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
self.assert_produce_request(
[ message1, message2 ],
@@ -92,8 +97,9 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
msg_count = 1+100
messages = [
- create_message("Just a plain message"),
- create_gzip_message(["Gzipped %d" % i for i in range(100)]),
+ create_message(b"Just a plain message"),
+ create_gzip_message([
+ ("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
]
# All snappy integration tests fail with nosnappyjava
@@ -108,14 +114,18 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
start_offset = self.current_offset(self.topic, 0)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset,
50000,
)
self.assert_produce_request([
- create_gzip_message(["Gzipped batch 1, message %d" % i for i in range(50000)])
+ create_gzip_message([
+ ("Gzipped batch 1, message %d" % i).encode('utf-8')
+ for i in range(50000)])
],
start_offset+50000,
50000,
@@ -151,7 +161,7 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all")
def test_produce__new_topic_fails_with_reasonable_error(self):
- new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4()))
+ new_topic = 'new_topic_{guid}'.format(guid = str(uuid.uuid4())).encode('utf-8')
producer = SimpleProducer(self.client)
# At first it doesn't exist