summaryrefslogtreecommitdiff
path: root/kafka/consumer/multiprocess.py
diff options
context:
space:
mode:
authorViktor Shlapakov <vshlapakov@gmail.com>2015-03-12 11:33:07 +0300
committerViktor Shlapakov <vshlapakov@gmail.com>2015-03-12 11:33:07 +0300
commit01ea3bf968c76a5f7a1999cfca36766d9bbff5e7 (patch)
treefa395d043365ada33d59e9eff4f8e82ddfb3eaad /kafka/consumer/multiprocess.py
parent4bab2fa5d1bc67e18b2f7791ff5fbb8e73143a5e (diff)
downloadkafka-python-01ea3bf968c76a5f7a1999cfca36766d9bbff5e7.tar.gz
Used thread-safe dict.copy().keys() for MP consumer partitions
Diffstat (limited to 'kafka/consumer/multiprocess.py')
-rw-r--r--kafka/consumer/multiprocess.py5
1 files changed, 4 insertions, 1 deletions
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
index db59f7b..bec3100 100644
--- a/kafka/consumer/multiprocess.py
+++ b/kafka/consumer/multiprocess.py
@@ -123,7 +123,10 @@ class MultiProcessConsumer(Consumer):
self.pause = Event() # Requests the consumers to pause fetch
self.size = Value('i', 0) # Indicator of number of messages to fetch
- partitions = list(self.offsets.keys())
+ # dict.keys() returns a view in py3 + it's not a thread-safe operation
+ # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
+ # It's safer to copy dict as it only runs during the init.
+ partitions = list(self.offsets.copy().keys())
# By default, start one consumer process for all partitions
# The logic below ensures that