diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-14 23:06:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-14 23:06:27 +0300 |
commit | fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch) | |
tree | 52e5860b1f8738b15e7c757c205961b761badd2b /benchmarks/record_batch_compose.py | |
parent | dd8e33654f2270097d6c1373dc272153670e48f8 (diff) | |
parent | 365cae02da59721df77923bb5f5a2d94a84b2e83 (diff) | |
download | kafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz |
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'benchmarks/record_batch_compose.py')
-rw-r--r-- | benchmarks/record_batch_compose.py | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py new file mode 100644 index 0000000..86012df --- /dev/null +++ b/benchmarks/record_batch_compose.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +from __future__ import print_function +import hashlib +import itertools +import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecordsBuilder + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(): + return iter(itertools.cycle([ + (random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE), + random.randint(*TIMESTAMP_RANGE) + ) + for _ in range(int(MESSAGES_PER_BATCH * 1.94)) + ])) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the main code + # properly, without optimizing it away + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare() + results = [] + + # Main benchmark code. + t0 = perf.perf_counter() + for _ in range(loops): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + key, value, timestamp = next(precomputed_samples) + size = batch.append(timestamp=timestamp, key=key, value=value) + assert size + batch.close() + results.append(batch.buffer()) + + res = perf.perf_counter() - t0 + + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_append_v0', func, 0) +runner.bench_time_func('batch_append_v1', func, 1) |