summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorTaras <voyn1991@gmail.com>2018-03-19 00:09:29 +0200
committerDana Powers <dana.powers@gmail.com>2018-04-18 13:41:14 -0700
commitd9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (patch)
tree1e2c0f300a8a8981c1e170505899a205f67308e5 /kafka
parent1c71dfc3c321372c808f45f569ae41352f420e8f (diff)
downloadkafka-python-d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0.tar.gz
Fix MemoryRecord bugs re error handling and add test coverage (#1448)
Diffstat (limited to 'kafka')
-rw-r--r--kafka/record/__init__.py4
-rw-r--r--kafka/record/default_records.py2
-rw-r--r--kafka/record/memory_records.py8
3 files changed, 7 insertions, 7 deletions
diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py
index cbd70d9..93936df 100644
--- a/kafka/record/__init__.py
+++ b/kafka/record/__init__.py
@@ -1,3 +1,3 @@
-from kafka.record.memory_records import MemoryRecords
+from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
-__all__ = ["MemoryRecords"]
+__all__ = ["MemoryRecords", "MemoryRecordsBuilder"]
diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py
index 2bbd47e..840868a 100644
--- a/kafka/record/default_records.py
+++ b/kafka/record/default_records.py
@@ -237,7 +237,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch):
# validate whether we have read all header bytes in the current record
if pos - start_pos != length:
- CorruptRecordException(
+ raise CorruptRecordException(
"Invalid record size: expected to read {} bytes in record "
"payload, but instead read {}".format(length, pos - start_pos))
self._pos = pos
diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py
index cb1cc01..f67c4fe 100644
--- a/kafka/record/memory_records.py
+++ b/kafka/record/memory_records.py
@@ -18,6 +18,7 @@
#
# So we can iterate over batches just by knowing offsets of Length. Magic is
# used to construct the correct class for Batch itself.
+from __future__ import division
import struct
@@ -131,15 +132,14 @@ class MemoryRecordsBuilder(object):
def append(self, timestamp, key, value, headers=[]):
""" Append a message to the buffer.
- Returns:
- (int, int): checksum and bytes written
+ Returns: RecordMetadata or None if unable to append
"""
if self._closed:
- return None, 0
+ return None
offset = self._next_offset
metadata = self._builder.append(offset, timestamp, key, value, headers)
- # Return of 0 size means there's no space to add a new message
+ # Return of None means there's no space to add a new message
if metadata is None:
return None