diff options
Diffstat (limited to 'kafka/consumer.py')
-rw-r--r-- | kafka/consumer.py | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py index 57b5b97..bead1dd 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -22,6 +22,7 @@ AUTO_COMMIT_INTERVAL = 5000 FETCH_DEFAULT_BLOCK_TIMEOUT = 1 FETCH_MAX_WAIT_TIME = 100 FETCH_MIN_BYTES = 4096 +FETCH_BUFFER_SIZE_BYTES = 4096 class FetchContext(object): @@ -216,8 +217,10 @@ class SimpleConsumer(Consumer): def __init__(self, client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL, - fetch_size_bytes=FETCH_MIN_BYTES): + fetch_size_bytes=FETCH_MIN_BYTES, + buffer_size=FETCH_BUFFER_SIZE_BYTES): + self.buffer_size = buffer_size self.partition_info = False # Do not return partition info in msgs self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME self.fetch_min_bytes = fetch_size_bytes @@ -364,7 +367,7 @@ class SimpleConsumer(Consumer): # use MaxBytes = client's bufsize since we're only # fetching one topic + partition req = FetchRequest( - self.topic, partition, offset, self.client.bufsize) + self.topic, partition, offset, self.buffer_size) (resp,) = self.client.send_fetch_request( [req], |