summaryrefslogtreecommitdiff
path: root/test/test_client_integration.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-06-10 02:39:05 -0700
committerDana Powers <dana.powers@gmail.com>2015-06-10 02:39:05 -0700
commit8e3cd1ca2801ed13522d177305898c29a3ebfd9b (patch)
tree5b2dc7ff857007e6ecf660d1f9b31ee393a16e06 /test/test_client_integration.py
parent4c9a3c6b9dac952154cdab2e11892bff240f9c91 (diff)
parent66b6b4aa6ee7c4461a4e43b2512e76ba3f04230f (diff)
downloadkafka-python-8e3cd1ca2801ed13522d177305898c29a3ebfd9b.tar.gz
Merge pull request #403 from dpkp/client_request_response_ordering
Client request response ordering
Diffstat (limited to 'test/test_client_integration.py')
-rw-r--r--test/test_client_integration.py32
1 files changed, 31 insertions, 1 deletions
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
index 585123b..a6ea8f7 100644
--- a/test/test_client_integration.py
+++ b/test/test_client_integration.py
@@ -2,8 +2,9 @@ import os
from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
- KafkaTimeoutError
+ KafkaTimeoutError, ProduceRequest
)
+from kafka.protocol import create_message
from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
@@ -49,6 +50,35 @@ class TestKafkaClientIntegration(KafkaIntegrationTestCase):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+ @kafka_versions('all')
+ def test_send_produce_request_maintains_request_response_order(self):
+
+ self.client.ensure_topic_exists(b'foo', timeout=1)
+ self.client.ensure_topic_exists(b'bar', timeout=1)
+
+ requests = [
+ ProduceRequest(
+ b'foo', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'foo', 1,
+ [create_message(b'a'), create_message(b'b')]),
+ ProduceRequest(
+ b'bar', 0,
+ [create_message(b'a'), create_message(b'b')]),
+ ]
+
+ responses = self.client.send_produce_request(requests)
+ while len(responses):
+ request = requests.pop()
+ response = responses.pop()
+ self.assertEqual(request.topic, response.topic)
+ self.assertEqual(request.partition, response.partition)
+
+
####################
# Offset Tests #
####################