diff options
-rw-r--r-- | kafka/client.py | 10 | ||||
-rw-r--r-- | kafka/consumer.py | 93 |
2 files changed, 87 insertions, 16 deletions
diff --git a/kafka/client.py b/kafka/client.py index 1146798..a1c2133 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -221,15 +221,19 @@ class KafkaClient(object): return out def send_fetch_request(self, payloads=[], fail_on_error=True, - callback=None): + callback=None, max_wait_time=100, min_bytes=4096): """ Encode and send a FetchRequest Payloads are grouped by topic and partition so they can be pipelined to the same brokers. """ - resps = self._send_broker_aware_request(payloads, - KafkaProtocol.encode_fetch_request, + + encoder = partial(KafkaProtocol.encode_fetch_request, + max_wait_time=max_wait_time, + min_bytes=min_bytes) + + resps = self._send_broker_aware_request(payloads, encoder, KafkaProtocol.decode_fetch_response) out = [] diff --git a/kafka/consumer.py b/kafka/consumer.py index 2b77d00..c0906ad 100644 --- a/kafka/consumer.py +++ b/kafka/consumer.py @@ -17,6 +17,37 @@ log = logging.getLogger("kafka") AUTO_COMMIT_MSG_COUNT = 100 AUTO_COMMIT_INTERVAL = 5000 +FETCH_DEFAULT_BLOCK_TIMEOUT = 1 +FETCH_MAX_WAIT_TIME = 100 +FETCH_MIN_BYTES = 4096 + + +class FetchContext(object): + """ + Class for managing the state of a consumer during fetch + """ + def __init__(self, consumer, block, timeout): + self.consumer = consumer + self.block = block + + if block and not timeout: + timeout = FETCH_DEFAULT_BLOCK_TIMEOUT + + self.timeout = timeout * 1000 + + def __enter__(self): + """Set fetch values based on blocking status""" + if self.block: + self.consumer.fetch_max_wait_time = self.timeout + self.consumer.fetch_min_bytes = 1 + else: + self.consumer.fetch_min_bytes = 0 + + def __exit__(self, type, value, traceback): + """Reset values to default""" + self.consumer.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.consumer.fetch_min_bytes = FETCH_MIN_BYTES + class Consumer(object): """ @@ -27,7 +58,7 @@ class Consumer(object): * Auto-commit logic * APIs for fetching pending message count """ - def __init__(self, client, topic, partitions=None, auto_commit=True, + def __init__(self, client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): @@ -185,13 +216,15 @@ class SimpleConsumer(Consumer): auto_commit_every_n=AUTO_COMMIT_MSG_COUNT, auto_commit_every_t=AUTO_COMMIT_INTERVAL): - # Indicates if partition info will be returned in messages - self.partition_info = False + self.partition_info = False # Do not return partition info in msgs + self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME + self.fetch_min_bytes = FETCH_MIN_BYTES super(SimpleConsumer, self).__init__(client, group, topic, - auto_commit, partitions, - auto_commit_every_n, - auto_commit_every_t) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) def provide_partition_info(self): """ @@ -242,6 +275,31 @@ class SimpleConsumer(Consumer): else: raise ValueError("Unexpected value for `whence`, %d" % whence) + def get_messages(self, count=1, block=True, timeout=0.1): + """ + Fetch the specified number of messages + + count: Indicates the maximum number of messages to be fetched + block: If True, the API will block till some messages are fetched. + timeout: If None, and block=True, the API will block infinitely. + If >0, API will block for specified time (in seconds) + """ + messages = [] + iterator = self.__iter__() + + # HACK: This splits the timeout between available partitions + timeout = timeout * 1.0 / len(self.offsets) + + with FetchContext(self, block, timeout): + while count > 0: + try: + messages.append(next(iterator)) + except StopIteration as exp: + break + count -= 1 + + return messages + def __iter__(self): """ Create an iterate per partition. Iterate through them calling next() @@ -283,13 +341,23 @@ class SimpleConsumer(Consumer): the end of this partition. """ + # The offset that is stored in the consumer is the offset that + # we have consumed. In subsequent iterations, we are supposed to + # fetch the next message (that is from the next offset) + # However, for the 0th message, the offset should be as-is. + # An OffsetFetchRequest to Kafka gives 0 for a new queue. This is + # problematic, since 0 is offset of a message which we have not yet + # consumed. if offset != 0: offset += 1 while True: # TODO: configure fetch size req = FetchRequest(self.topic, partition, offset, 1024) - (resp,) = self.client.send_fetch_request([req]) + + (resp,) = self.client.send_fetch_request([req], + max_wait_time=self.fetch_max_wait_time, + min_bytes=self.fetch_min_bytes) assert resp.topic == self.topic assert resp.partition == partition @@ -297,9 +365,8 @@ class SimpleConsumer(Consumer): next_offset = None for message in resp.messages: next_offset = message.offset - yield message - # update the internal state _after_ we yield the message self.offsets[partition] = message.offset + yield message if next_offset is None: break else: @@ -338,10 +405,10 @@ class MultiProcessConsumer(Consumer): # Initiate the base consumer class super(MultiProcessConsumer, self).__init__(client, group, topic, - auto_commit, - auto_commit_every_n, - auto_commit_every_t, - partitions=None) + partitions=partitions, + auto_commit=auto_commit, + auto_commit_every_n=auto_commit_every_n, + auto_commit_every_t=auto_commit_every_t) # Variables for managing and controlling the data flow from # consumer child process to master |