diff options
author | Taras <voyn1991@gmail.com> | 2018-03-19 00:09:29 +0200 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2018-04-18 13:41:14 -0700 |
commit | d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0 (patch) | |
tree | 1e2c0f300a8a8981c1e170505899a205f67308e5 /kafka | |
parent | 1c71dfc3c321372c808f45f569ae41352f420e8f (diff) | |
download | kafka-python-d9e41c8e8fb7033a3e9a9a7654bc2b0125f337a0.tar.gz |
Fix MemoryRecord bugs re error handling and add test coverage (#1448)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/record/__init__.py | 4 | ||||
-rw-r--r-- | kafka/record/default_records.py | 2 | ||||
-rw-r--r-- | kafka/record/memory_records.py | 8 |
3 files changed, 7 insertions, 7 deletions
diff --git a/kafka/record/__init__.py b/kafka/record/__init__.py index cbd70d9..93936df 100644 --- a/kafka/record/__init__.py +++ b/kafka/record/__init__.py @@ -1,3 +1,3 @@ -from kafka.record.memory_records import MemoryRecords +from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder -__all__ = ["MemoryRecords"] +__all__ = ["MemoryRecords", "MemoryRecordsBuilder"] diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 2bbd47e..840868a 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -237,7 +237,7 @@ class DefaultRecordBatch(DefaultRecordBase, ABCRecordBatch): # validate whether we have read all header bytes in the current record if pos - start_pos != length: - CorruptRecordException( + raise CorruptRecordException( "Invalid record size: expected to read {} bytes in record " "payload, but instead read {}".format(length, pos - start_pos)) self._pos = pos diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index cb1cc01..f67c4fe 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -18,6 +18,7 @@ # # So we can iterate over batches just by knowing offsets of Length. Magic is # used to construct the correct class for Batch itself. +from __future__ import division import struct @@ -131,15 +132,14 @@ class MemoryRecordsBuilder(object): def append(self, timestamp, key, value, headers=[]): """ Append a message to the buffer. - Returns: - (int, int): checksum and bytes written + Returns: RecordMetadata or None if unable to append """ if self._closed: - return None, 0 + return None offset = self._next_offset metadata = self._builder.append(offset, timestamp, key, value, headers) - # Return of 0 size means there's no space to add a new message + # Return of None means there's no space to add a new message if metadata is None: return None |