summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py10
-rw-r--r--kafka/consumer/kafka.py4
-rw-r--r--kafka/producer/base.py35
-rw-r--r--kafka/version.py2
4 files changed, 34 insertions, 17 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 810fa46..9018bb4 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -236,13 +236,13 @@ class KafkaClient(object):
responses[topic_partition] = None
continue
else:
- connections_by_socket[conn.get_connected_socket()] = (conn, broker)
+ connections_by_socket[conn.get_connected_socket()] = (conn, broker, requestId)
conn = None
while connections_by_socket:
sockets = connections_by_socket.keys()
rlist, _, _ = select.select(sockets, [], [], None)
- conn, broker = connections_by_socket.pop(rlist[0])
+ conn, broker, requestId = connections_by_socket.pop(rlist[0])
try:
response = conn.recv(requestId)
except ConnectionError as e:
@@ -607,11 +607,7 @@ class KafkaClient(object):
else:
decoder = KafkaProtocol.decode_produce_response
- try:
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
- except Exception:
- if fail_on_error:
- raise
+ resps = self._send_broker_aware_request(payloads, encoder, decoder)
return [resp if not callback else callback(resp) for resp in resps
if resp is not None and
diff --git a/kafka/consumer/kafka.py b/kafka/consumer/kafka.py
index 21b2bf6..3ef106c 100644
--- a/kafka/consumer/kafka.py
+++ b/kafka/consumer/kafka.py
@@ -269,6 +269,10 @@ class KafkaConsumer(object):
# Reset message iterator in case we were in the middle of one
self._reset_message_iterator()
+ def close(self):
+ """Close this consumer's underlying client."""
+ self._client.close()
+
def next(self):
"""Return the next available message
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 8774c66..39b1f84 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -78,9 +78,17 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
retrying messages after stop_event is set, defaults to 30.
"""
request_tries = {}
- client.reinit()
- stop_at = None
+ while not stop_event.is_set():
+ try:
+ client.reinit()
+ except Exception as e:
+ log.warn('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
+ time.sleep(float(retry_options.backoff_ms) / 1000)
+ else:
+ break
+
+ stop_at = None
while not (stop_event.is_set() and queue.empty() and not request_tries):
# Handle stop_timeout
@@ -407,17 +415,26 @@ class Producer(object):
raise
return resp
- def stop(self, timeout=1):
+ def stop(self, timeout=None):
"""
- Stop the producer. Optionally wait for the specified timeout before
- forcefully cleaning up.
+ Stop the producer (async mode). Blocks until async thread completes.
"""
+ if timeout is not None:
+ log.warning('timeout argument to stop() is deprecated - '
+ 'it will be removed in future release')
+
+ if not self.async:
+ log.warning('producer.stop() called, but producer is not async')
+ return
+
+ if self.stopped:
+ log.warning('producer.stop() called, but producer is already stopped')
+ return
+
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.thread.join(timeout)
-
- if self.thread.is_alive():
- self.thread_stop_event.set()
+ self.thread_stop_event.set()
+ self.thread.join()
if hasattr(self, '_cleanup_func'):
# Remove cleanup handler now that we've stopped
diff --git a/kafka/version.py b/kafka/version.py
index cd64b48..9272695 100644
--- a/kafka/version.py
+++ b/kafka/version.py
@@ -1 +1 @@
-__version__ = '0.9.5-dev'
+__version__ = '0.9.5'