summaryrefslogtreecommitdiff
path: root/kafka/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r--kafka/consumer.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 57b5b97..bead1dd 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -22,6 +22,7 @@ AUTO_COMMIT_INTERVAL = 5000
FETCH_DEFAULT_BLOCK_TIMEOUT = 1
FETCH_MAX_WAIT_TIME = 100
FETCH_MIN_BYTES = 4096
+FETCH_BUFFER_SIZE_BYTES = 4096
class FetchContext(object):
@@ -216,8 +217,10 @@ class SimpleConsumer(Consumer):
def __init__(self, client, group, topic, auto_commit=True, partitions=None,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES):
+ fetch_size_bytes=FETCH_MIN_BYTES,
+ buffer_size=FETCH_BUFFER_SIZE_BYTES):
+ self.buffer_size = buffer_size
self.partition_info = False # Do not return partition info in msgs
self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
self.fetch_min_bytes = fetch_size_bytes
@@ -364,7 +367,7 @@ class SimpleConsumer(Consumer):
# use MaxBytes = client's bufsize since we're only
# fetching one topic + partition
req = FetchRequest(
- self.topic, partition, offset, self.client.bufsize)
+ self.topic, partition, offset, self.buffer_size)
(resp,) = self.client.send_fetch_request(
[req],