diff options
Diffstat (limited to 'benchmarks/record_batch_compose.py')
-rw-r--r-- | benchmarks/record_batch_compose.py | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py new file mode 100644 index 0000000..86012df --- /dev/null +++ b/benchmarks/record_batch_compose.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +from __future__ import print_function +import hashlib +import itertools +import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecordsBuilder + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(): + return iter(itertools.cycle([ + (random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE), + random.randint(*TIMESTAMP_RANGE) + ) + for _ in range(int(MESSAGES_PER_BATCH * 1.94)) + ])) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the main code + # properly, without optimizing it away + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare() + results = [] + + # Main benchmark code. + t0 = perf.perf_counter() + for _ in range(loops): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + key, value, timestamp = next(precomputed_samples) + size = batch.append(timestamp=timestamp, key=key, value=value) + assert size + batch.close() + results.append(batch.buffer()) + + res = perf.perf_counter() - t0 + + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_append_v0', func, 0) +runner.bench_time_func('batch_append_v1', func, 1) |