diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/client.py | 10 | ||||
-rw-r--r-- | kafka/consumer/kafka.py | 4 | ||||
-rw-r--r-- | kafka/producer/base.py | 35 | ||||
-rw-r--r-- | kafka/version.py | 2 |
4 files changed, 34 insertions, 17 deletions
diff --git a/kafka/client.py b/kafka/client.py index 810fa46..9018bb4 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -236,13 +236,13 @@ class KafkaClient(object): responses[topic_partition] = None continue else: - connections_by_socket[conn.get_connected_socket()] = (conn, broker) + connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId) conn = None while connections_by_socket: sockets = connections_by_socket.keys() rlist, _, _ = select.select(sockets, [], [], None) - conn, broker = connections_by_socket.pop(rlist[0]) + conn, broker, requestId = connections_by_socket.pop(rlist[0]) try: response = conn.recv(requestId) except ConnectionError as e: @@ -607,11 +607,7 @@ class KafkaClient(object): else: decoder = KafkaProtocol.decode_produce_response - try: - resps = self._send_broker_aware_request(payloads, encoder, decoder) - except Exception: - if fail_on_error: - raise + resps = self._send_broker_aware_request(payloads, encoder, decoder) return [resp if not callback else callback(resp) for resp in resps if resp is not None and diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py index 21b2bf6..3ef106c 100644 --- a/kafka/consumer/kafka.py +++ b/kafka/consumer/kafka.py @@ -269,6 +269,10 @@ class KafkaConsumer(object): # Reset message iterator in case we were in the middle of one self._reset_message_iterator() + def close(self): + """Close this consumer's underlying client.""" + self._client.close() + def next(self): """Return the next available message diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 8774c66..39b1f84 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, retrying messages after stop_event is set, defaults to 30. """ request_tries = {} - client.reinit() - stop_at = None + while not stop_event.is_set(): + try: + client.reinit() + except Exception as e: + log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + else: + break + + stop_at = None while not (stop_event.is_set() and queue.empty() and not request_tries): # Handle stop_timeout @@ -407,17 +415,26 @@ class Producer(object): raise return resp - def stop(self, timeout=1): + def stop(self, timeout=None): """ - Stop the producer. Optionally wait for the specified timeout before - forcefully cleaning up. + Stop the producer (async mode). Blocks until async thread completes. """ + if timeout is not None: + log.warning('timeout argument to stop() is deprecated - ' + 'it will be removed in future release') + + if not self.async: + log.warning('producer.stop() called, but producer is not async') + return + + if self.stopped: + log.warning('producer.stop() called, but producer is already stopped') + return + if self.async: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) - self.thread.join(timeout) - - if self.thread.is_alive(): - self.thread_stop_event.set() + self.thread_stop_event.set() + self.thread.join() if hasattr(self, '_cleanup_func'): # Remove cleanup handler now that we've stopped diff --git a/kafka/version.py b/kafka/version.py index cd64b48..9272695 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '0.9.5-dev' +__version__ = '0.9.5' |