diff options
author | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 17:01:32 +0530 |
---|---|---|
committer | Mahendra M <mahendra.m@gmail.com> | 2013-06-25 17:01:32 +0530 |
commit | b022be28de85634a642cbd4e0ca4ce89d46d21dd (patch) | |
tree | a1fa620452eb9b025ea88a71afa6aa8586559057 | |
parent | 2e38a5273270df8959279973cbac69e5658ec9a9 (diff) | |
download | kafka-python-b022be28de85634a642cbd4e0ca4ce89d46d21dd.tar.gz |
Implement blocking get_messages for SimpleConsumer
The implementation is done by using simple options to
Kafka Fetch Request
Also in the SimpleConsumer iterator, update the offset before the
message is yielded. This is so that the consumer state is not lost
if certain cases.
For eg: the message is yielded and consumed by the caller,
but the caller does not come back into the generator again.
The message will be consumed but the status is not updated in
the consumer
-rw-r--r-- | kafka/client.py | 10 | ||||
-rw-r--r-- | kafka/consumer.py | 93 |
2 files changed, 87 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..a1c2133 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -221,15 +221,19 @@ class KafkaClient(object): return out def send_fetch_request(self, payloads=[], fail_on_error=True, - callback=None): + callback=None, max_wait_time=100, min_bytes=4096): """ Encode and send a FetchRequest Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_fetch_request, + + encoder = partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request(payloads, encoder, KafkaProtocol.decode_fetch_response) out = [] diff --git a/kafka/consumer.py b/kafka/consumer.py index 2b77d00..c0906ad 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -17,6 +17,37 @@ log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 + + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch + """ + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block + + if block and not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + + self.timeout = timeout * 1000 + + def __enter__(self): + """Set fetch values based on blocking status""" + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + + def __exit__(self, type, value, traceback): + """Reset values to default""" + self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + class Consumer(object): """ @@ -27,7 +58,7 @@ class Consumer(object): * Auto-commit logic * APIs for fetching pending message count """ - def __init__(self, client, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -185,13 +216,15 @@ class SimpleConsumer(Consumer): auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): - # Indicates if partition info will be returned in messages - self.partition_info = False + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = FETCH_MIN_BYTES super(SimpleConsumer, self).__init__(client, group, topic, - auto_commit, partitions, - auto_commit_every_n, - auto_commit_every_t) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -242,6 +275,31 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + messages = [] + iterator = self.__iter__() + + # HACK: This splits the timeout between available partitions + timeout = timeout * 1.0 / len(self.offsets) + + with FetchContext(self, block, timeout): + while count > 0: + try: + messages.append(next(iterator)) + except StopIteration as exp: + break + count -= 1 + + return messages + def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() @@ -283,13 +341,23 @@ class SimpleConsumer(Consumer): the end of this partition. """ + # The offset that is stored in the consumer is the offset that + # we have consumed. In subsequent iterations, we are supposed to + # fetch the next message (that is from the next offset) + # However, for the 0th message, the offset should be as-is. + # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is + # problematic, since 0 is offset of a message which we have not yet + # consumed. if offset != 0: offset += 1 while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) - (resp,) = self.client.send_fetch_request([req]) + + (resp,) = self.client.send_fetch_request([req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=self.fetch_min_bytes) assert resp.topic == self.topic assert resp.partition == partition @@ -297,9 +365,8 @@ class SimpleConsumer(Consumer): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: @@ -338,10 +405,10 @@ class MultiProcessConsumer(Consumer): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - auto_commit, - auto_commit_every_n, - auto_commit_every_t, - partitions=None) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master |