summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMahendra M <mahendra.m@gmail.com>2013-06-25 17:01:32 +0530
committerMahendra M <mahendra.m@gmail.com>2013-06-25 17:01:32 +0530
commitb022be28de85634a642cbd4e0ca4ce89d46d21dd (patch)
treea1fa620452eb9b025ea88a71afa6aa8586559057
parent2e38a5273270df8959279973cbac69e5658ec9a9 (diff)
downloadkafka-python-b022be28de85634a642cbd4e0ca4ce89d46d21dd.tar.gz
Implement blocking get_messages for SimpleConsumer
The implementation is done by using simple options to Kafka Fetch Request Also in the SimpleConsumer iterator, update the offset before the message is yielded. This is so that the consumer state is not lost if certain cases. For eg: the message is yielded and consumed by the caller, but the caller does not come back into the generator again. The message will be consumed but the status is not updated in the consumer
-rw-r--r--kafka/client.py10
-rw-r--r--kafka/consumer.py93
2 files changed, 87 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 1146798..a1c2133 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -221,15 +221,19 @@ class KafkaClient(object):
return out
def send_fetch_request(self, payloads=[], fail_on_error=True,
- callback=None):
+ callback=None, max_wait_time=100, min_bytes=4096):
"""
Encode and send a FetchRequest
Payloads are grouped by topic and partition so they can be pipelined
to the same brokers.
"""
- resps = self._send_broker_aware_request(payloads,
- KafkaProtocol.encode_fetch_request,
+
+ encoder = partial(KafkaProtocol.encode_fetch_request,
+ max_wait_time=max_wait_time,
+ min_bytes=min_bytes)
+
+ resps = self._send_broker_aware_request(payloads, encoder,
KafkaProtocol.decode_fetch_response)
out = []
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 2b77d00..c0906ad 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -17,6 +17,37 @@ log = logging.getLogger("kafka")
AUTO_COMMIT_MSG_COUNT = 100
AUTO_COMMIT_INTERVAL = 5000
+FETCH_DEFAULT_BLOCK_TIMEOUT = 1
+FETCH_MAX_WAIT_TIME = 100
+FETCH_MIN_BYTES = 4096
+
+
+class FetchContext(object):
+ """
+ Class for managing the state of a consumer during fetch
+ """
+ def __init__(self, consumer, block, timeout):
+ self.consumer = consumer
+ self.block = block
+
+ if block and not timeout:
+ timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
+
+ self.timeout = timeout * 1000
+
+ def __enter__(self):
+ """Set fetch values based on blocking status"""
+ if self.block:
+ self.consumer.fetch_max_wait_time = self.timeout
+ self.consumer.fetch_min_bytes = 1
+ else:
+ self.consumer.fetch_min_bytes = 0
+
+ def __exit__(self, type, value, traceback):
+ """Reset values to default"""
+ self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.consumer.fetch_min_bytes = FETCH_MIN_BYTES
+
class Consumer(object):
"""
@@ -27,7 +58,7 @@ class Consumer(object):
* Auto-commit logic
* APIs for fetching pending message count
"""
- def __init__(self, client, topic, partitions=None, auto_commit=True,
+ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
@@ -185,13 +216,15 @@ class SimpleConsumer(Consumer):
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):
- # Indicates if partition info will be returned in messages
- self.partition_info = False
+ self.partition_info = False # Do not return partition info in msgs
+ self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
+ self.fetch_min_bytes = FETCH_MIN_BYTES
super(SimpleConsumer, self).__init__(client, group, topic,
- auto_commit, partitions,
- auto_commit_every_n,
- auto_commit_every_t)
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
def provide_partition_info(self):
"""
@@ -242,6 +275,31 @@ class SimpleConsumer(Consumer):
else:
raise ValueError("Unexpected value for `whence`, %d" % whence)
+ def get_messages(self, count=1, block=True, timeout=0.1):
+ """
+ Fetch the specified number of messages
+
+ count: Indicates the maximum number of messages to be fetched
+ block: If True, the API will block till some messages are fetched.
+ timeout: If None, and block=True, the API will block infinitely.
+ If >0, API will block for specified time (in seconds)
+ """
+ messages = []
+ iterator = self.__iter__()
+
+ # HACK: This splits the timeout between available partitions
+ timeout = timeout * 1.0 / len(self.offsets)
+
+ with FetchContext(self, block, timeout):
+ while count > 0:
+ try:
+ messages.append(next(iterator))
+ except StopIteration as exp:
+ break
+ count -= 1
+
+ return messages
+
def __iter__(self):
"""
Create an iterate per partition. Iterate through them calling next()
@@ -283,13 +341,23 @@ class SimpleConsumer(Consumer):
the end of this partition.
"""
+ # The offset that is stored in the consumer is the offset that
+ # we have consumed. In subsequent iterations, we are supposed to
+ # fetch the next message (that is from the next offset)
+ # However, for the 0th message, the offset should be as-is.
+ # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is
+ # problematic, since 0 is offset of a message which we have not yet
+ # consumed.
if offset != 0:
offset += 1
while True:
# TODO: configure fetch size
req = FetchRequest(self.topic, partition, offset, 1024)
- (resp,) = self.client.send_fetch_request([req])
+
+ (resp,) = self.client.send_fetch_request([req],
+ max_wait_time=self.fetch_max_wait_time,
+ min_bytes=self.fetch_min_bytes)
assert resp.topic == self.topic
assert resp.partition == partition
@@ -297,9 +365,8 @@ class SimpleConsumer(Consumer):
next_offset = None
for message in resp.messages:
next_offset = message.offset
- yield message
- # update the internal state _after_ we yield the message
self.offsets[partition] = message.offset
+ yield message
if next_offset is None:
break
else:
@@ -338,10 +405,10 @@ class MultiProcessConsumer(Consumer):
# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(client, group, topic,
- auto_commit,
- auto_commit_every_n,
- auto_commit_every_t,
- partitions=None)
+ partitions=partitions,
+ auto_commit=auto_commit,
+ auto_commit_every_n=auto_commit_every_n,
+ auto_commit_every_t=auto_commit_every_t)
# Variables for managing and controlling the data flow from
# consumer child process to master