summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
committerMark Roberts <wizzat@gmail.com>2014-04-25 10:55:04 -0700
commit57913f9f914a959f52bc9040a172f8c9ff77e491 (patch)
treefe5cc6c14283a4c9d9175a748ef97f7d55df6fd7 /kafka
parent0e50f33ec678f6d656d488ce8a4537f95bba003e (diff)
downloadkafka-python-57913f9f914a959f52bc9040a172f8c9ff77e491.tar.gz
Various fixes
Bump version number to 0.9.1 Update readme to show supported Kafka/Python versions Validate arguments in consumer.py, add initial consumer unit test Make service kill() child processes when startup fails Add tests for util.py, fix Python 2.6 specific bug.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/consumer.py3
-rw-r--r--kafka/util.py8
2 files changed, 9 insertions, 2 deletions
diff --git a/kafka/consumer.py b/kafka/consumer.py
index 3f8d8c2..98f18a0 100644
--- a/kafka/consumer.py
+++ b/kafka/consumer.py
@@ -3,6 +3,7 @@ from __future__ import absolute_import
from itertools import izip_longest, repeat
import logging
import time
+import numbers
from threading import Lock
from multiprocessing import Process, Queue as MPQueue, Event, Value
from Queue import Empty, Queue
@@ -81,6 +82,8 @@ class Consumer(object):
if not partitions:
partitions = self.client.topic_partitions[topic]
+ else:
+ assert all(isinstance(x, numbers.Integral) for x in partitions)
# Variables for handling offset commits
self.commit_lock = Lock()
diff --git a/kafka/util.py b/kafka/util.py
index 54052fb..0577a88 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,5 +1,6 @@
-from collections import defaultdict
+import sys
import struct
+import collections
from threading import Thread, Event
from kafka.common import BufferUnderflowError
@@ -15,6 +16,9 @@ def write_int_string(s):
def write_short_string(s):
if s is None:
return struct.pack('>h', -1)
+ elif len(s) > 32767 and sys.version < (2,7):
+ # Python 2.6 issues a deprecation warning instead of a struct error
+ raise struct.error(len(s))
else:
return struct.pack('>h%ds' % len(s), len(s), s)
@@ -63,7 +67,7 @@ def relative_unpack(fmt, data, cur):
def group_by_topic_and_partition(tuples):
- out = defaultdict(dict)
+ out = collections.defaultdict(dict)
for t in tuples:
out[t.topic][t.partition] = t
return out