From fbea5f04bccd28f3aa15a1711548b131504591ac Mon Sep 17 00:00:00 2001 From: Taras Date: Tue, 10 Oct 2017 00:13:16 +0300 Subject: Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format --- kafka/protocol/message.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) (limited to 'kafka/protocol/message.py') diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..f5a51a9 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,12 +154,13 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) return items.read(size + 4) encoded_values = [] @@ -167,7 +168,10 @@ class MessageSet(AbstractType): encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): -- cgit v1.2.1