summaryrefslogtreecommitdiff
path: root/kafka/consumer/kafka.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 15:37:17 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 15:44:15 -0800
commita3ec9bd8e8c730c9f6715b536c0c590230fc2e28 (patch)
treeeaebf6dc87ffb83d7256497355c5559f5eec5d72 /kafka/consumer/kafka.py
parentad030ccd4df57305bb624b03eddaa2641f956160 (diff)
downloadkafka-python-a3ec9bd8e8c730c9f6715b536c0c590230fc2e28.tar.gz
Update references to kafka.common Request/Response (now Payload)
Diffstat (limited to 'kafka/consumer/kafka.py')
-rw-r--r--kafka/consumer/kafka.py11
1 files changed, 6 insertions, 5 deletions
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 3ef106c..1bd3def 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -11,7 +11,8 @@ import six
from kafka.client import KafkaClient
from kafka.common import (
- OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
+ OffsetFetchRequest, OffsetCommitRequest,
+ OffsetRequestPayload, FetchRequestPayload,
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
@@ -333,9 +334,9 @@ class KafkaConsumer(object):
'No fetch offsets found when calling fetch_messages'
)
- fetches = [FetchRequest(topic, partition,
- self._offsets.fetch[(topic, partition)],
- max_bytes)
+ fetches = [FetchRequestPayload(topic, partition,
+ self._offsets.fetch[(topic, partition)],
+ max_bytes)
for (topic, partition) in self._topics]
# send_fetch_request will batch topic/partition requests by leader
@@ -425,7 +426,7 @@ class KafkaConsumer(object):
topic / partition. See:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
"""
- reqs = [OffsetRequest(topic, partition, request_time_ms, max_num_offsets)]
+ reqs = [OffsetRequestPayload(topic, partition, request_time_ms, max_num_offsets)]
(resp,) = self._client.send_offset_request(reqs)