summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py10
-rw-r--r--kafka/consumer.py93
2 files changed, 87 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..a1c2133 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -221,15 +221,19 @@ class KafkaClient(object):
return out
def send_fetch_request(self, payloads=[], fail_on_error=True,
- callback=None):
+ callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_fetch_request,
+
+ encoder = partial(KafkaProtocol.encode_fetch_request,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes)
+
+ resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = []
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 2b77d00..c0906ad 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -17,6 +17,37 @@ log = logging.getLogger("kafka")
AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000
+FETCH_DEFAULT_BLOCK_TIMEOUT = 1
+FETCH_MAX_WAIT_TIME = 100
+FETCH_MIN_BYTES = 4096
+
+
+class FetchContext(object):
+ """
+ Class for managing the state of a consumer during fetch
+ """
+ def __init__(self, consumer, block, timeout):
+ self.consumer = consumer
+ self.block = block
+
+ if block and not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+
+ self.timeout = timeout * 1000
+
+ def __enter__(self):
+ """Set fetch values based on blocking status"""
+ if self.block:
+ self.consumer.fetch_max_wait_time = self.timeout
+ self.consumer.fetch_min_bytes = 1
+ else:
+ self.consumer.fetch_min_bytes = 0
+
+ def __exit__(self, type, value, traceback):
+ """Reset values to default"""
+ self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+
class Consumer(object):
"""
@@ -27,7 +58,7 @@ class Consumer(object):
* Auto-commit logic
* APIs for fetching pending message count
"""
- def __init__(self, client, topic, partitions=None, auto_commit=True,
+ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
@@ -185,13 +216,15 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
- # Indicates if partition info will be returned in messages
- self.partition_info = False
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = FETCH_MIN_BYTES
super(SimpleConsumer, self).__init__(client, group, topic,
- auto_commit, partitions,
- auto_commit_every_n,
- auto_commit_every_t)
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
def provide_partition_info(self):
"""
@@ -242,6 +275,31 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def get_messages(self, count=1, block=True, timeout=0.1):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If None, and block=True, the API will block infinitely.
+ If >0, API will block for specified time (in seconds)
+ """
+ messages = []
+ iterator = self.__iter__()
+
+ # HACK: This splits the timeout between available partitions
+ timeout = timeout * 1.0 / len(self.offsets)
+
+ with FetchContext(self, block, timeout):
+ while count > 0:
+ try:
+ messages.append(next(iterator))
+ except StopIteration as exp:
+ break
+ count -= 1
+
+ return messages
+
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next()
@@ -283,13 +341,23 @@ class SimpleConsumer(Consumer):
the end of this partition.
"""
+ # The offset that is stored in the consumer is the offset that
+ # we have consumed. In subsequent iterations, we are supposed to
+ # fetch the next message (that is from the next offset)
+ # However, for the 0th message, the offset should be as-is.
+ # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
+ # problematic, since 0 is offset of a message which we have not yet
+ # consumed.
if offset != 0:
offset += 1
while True:
# TODO: configure fetch size
req = FetchRequest(self.topic, partition, offset, 1024)
- (resp,) = self.client.send_fetch_request([req])
+
+ (resp,) = self.client.send_fetch_request([req],
+ max_wait_time=self.fetch_max_wait_time,
+ min_bytes=self.fetch_min_bytes)
assert resp.topic == self.topic
assert resp.partition == partition
@@ -297,9 +365,8 @@ class SimpleConsumer(Consumer):
next_offset = None
for message in resp.messages:
next_offset = message.offset
- yield message
- # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
+ yield message
if next_offset is None:
break
else:
@@ -338,10 +405,10 @@ class MultiProcessConsumer(Consumer):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(client, group, topic,
- auto_commit,
- auto_commit_every_n,
- auto_commit_every_t,
- partitions=None)
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master