diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-14 23:06:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-14 23:06:27 +0300 |
commit | fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch) | |
tree | 52e5860b1f8738b15e7c757c205961b761badd2b /benchmarks | |
parent | dd8e33654f2270097d6c1373dc272153670e48f8 (diff) | |
parent | 365cae02da59721df77923bb5f5a2d94a84b2e83 (diff) | |
download | kafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz |
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'benchmarks')
-rw-r--r-- | benchmarks/README | 4 | ||||
-rw-r--r-- | benchmarks/record_batch_compose.py | 75 | ||||
-rw-r--r-- | benchmarks/record_batch_read.py | 80 |
3 files changed, 159 insertions, 0 deletions
diff --git a/benchmarks/README b/benchmarks/README new file mode 100644 index 0000000..369e8b6 --- /dev/null +++ b/benchmarks/README @@ -0,0 +1,4 @@ +The `record_batch_*` benchmarks in this section are written using +``perf`` library, created by Viktor Stinner. For more information on how to get +reliable results of test runs please consult +http://perf.readthedocs.io/en/latest/run_benchmark.html. diff --git a/benchmarks/record_batch_compose.py b/benchmarks/record_batch_compose.py new file mode 100644 index 0000000..86012df --- /dev/null +++ b/benchmarks/record_batch_compose.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +from __future__ import print_function +import hashlib +import itertools +import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecordsBuilder + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(): + return iter(itertools.cycle([ + (random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE), + random.randint(*TIMESTAMP_RANGE) + ) + for _ in range(int(MESSAGES_PER_BATCH * 1.94)) + ])) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the main code + # properly, without optimizing it away + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare() + results = [] + + # Main benchmark code. + t0 = perf.perf_counter() + for _ in range(loops): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + key, value, timestamp = next(precomputed_samples) + size = batch.append(timestamp=timestamp, key=key, value=value) + assert size + batch.close() + results.append(batch.buffer()) + + res = perf.perf_counter() - t0 + + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_append_v0', func, 0) +runner.bench_time_func('batch_append_v1', func, 1) diff --git a/benchmarks/record_batch_read.py b/benchmarks/record_batch_read.py new file mode 100644 index 0000000..7ae471e --- /dev/null +++ b/benchmarks/record_batch_read.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +from __future__ import print_function +import hashlib +import itertools +import os +import random + +import perf + +from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder + + +DEFAULT_BATCH_SIZE = 1600 * 1024 +KEY_SIZE = 6 +VALUE_SIZE = 60 +TIMESTAMP_RANGE = [1505824130000, 1505824140000] + +BATCH_SAMPLES = 5 +MESSAGES_PER_BATCH = 100 + + +def random_bytes(length): + buffer = bytearray(length) + for i in range(length): + buffer[i] = random.randint(0, 255) + return bytes(buffer) + + +def prepare(magic): + samples = [] + for _ in range(BATCH_SAMPLES): + batch = MemoryRecordsBuilder( + magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0) + for _ in range(MESSAGES_PER_BATCH): + size = batch.append( + random.randint(*TIMESTAMP_RANGE), + random_bytes(KEY_SIZE), + random_bytes(VALUE_SIZE)) + assert size + batch.close() + samples.append(bytes(batch.buffer())) + + return iter(itertools.cycle(samples)) + + +def finalize(results): + # Just some strange code to make sure PyPy does execute the code above + # properly + hash_val = hashlib.md5() + for buf in results: + hash_val.update(buf) + print(hash_val, file=open(os.devnull, "w")) + + +def func(loops, magic): + # Jit can optimize out the whole function if the result is the same each + # time, so we need some randomized input data ) + precomputed_samples = prepare(magic) + results = [] + + # Main benchmark code. + batch_data = next(precomputed_samples) + t0 = perf.perf_counter() + for _ in range(loops): + records = MemoryRecords(batch_data) + while records.has_next(): + batch = records.next_batch() + batch.validate_crc() + for record in batch: + results.append(record.value) + + res = perf.perf_counter() - t0 + finalize(results) + + return res + + +runner = perf.Runner() +runner.bench_time_func('batch_read_v0', func, 0) +runner.bench_time_func('batch_read_v1', func, 1) |