summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/conn.py1
-rw-r--r--kafka/producer/base.py41
2 files changed, 17 insertions, 25 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 8142c45..ea55481 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -170,7 +170,6 @@ class KafkaConnection(local):
c.port = copy.copy(self.port)
c.timeout = copy.copy(self.timeout)
c._sock = None
- c._dirty = True
return c
def close(self):
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index a9288d9..bb7fd43 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -3,11 +3,10 @@ from __future__ import absolute_import
import logging
import time
-from Queue import Queue
try:
- from queue import Empty
+ from queue import Empty, Queue
except ImportError:
- from Queue import Empty
+ from Queue import Empty, Queue
from collections import defaultdict
from threading import Thread
@@ -33,13 +32,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
-
- NOTE: Ideally, this should have been a method inside the Producer
- class. However, multiprocessing module has issues in windows. The
- functionality breaks unless this function is kept outside of a class
"""
stop = False
- client.reinit()
while not stop:
timeout = batch_time
@@ -142,18 +136,20 @@ class Producer(object):
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
- self.proc = Thread(target=_send_upstream,
- args=(self.queue,
- self.client.copy(),
- self.codec,
- batch_send_every_t,
- batch_send_every_n,
- self.req_acks,
- self.ack_timeout))
-
- # Process will die if main thread exits
- self.proc.daemon = True
- self.proc.start()
+ self.thread = Thread(target=_send_upstream,
+ args=(self.queue,
+ self.client.copy(),
+ self.codec,
+ batch_send_every_t,
+ batch_send_every_n,
+ self.req_acks,
+ self.ack_timeout))
+
+ # Thread will die if main thread exits
+ self.thread.daemon = True
+ self.thread.start()
+
+
def send_messages(self, topic, partition, *msg):
"""
@@ -210,7 +206,4 @@ class Producer(object):
"""
if self.async:
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.proc.join(timeout)
-
- if self.proc.is_alive():
- raise SystemError("Can't join Kafka async thread")
+ self.thread.join(timeout)