1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
#!/usr/bin/env python
"""
An example of benchmarking cache write speeds with semi-randomized response content
Usage (optionally for a specific backend and/or serializer):
```
python benchmark.py -b <backend> -s <serializer>
```
"""
from argparse import ArgumentParser
from os import urandom
from random import random
from time import perf_counter as time
import requests
from rich import print
from rich.progress import Progress
from requests_cache import CachedResponse, CachedSession
BASE_RESPONSE = requests.get('https://httpbin.org/get')
CACHE_NAME = 'rubbish_bin'
WARMUP_ITERATIONS = 100
ITERATIONS = 5000
MAX_RESPONSE_SIZE = 1024 * 350
# Defaults for DynamoDB
AWS_OPTIONS = {
'endpoint_url': 'http://localhost:8000',
'region_name': 'us-east-1',
'aws_access_key_id': 'placeholder',
'aws_secret_access_key': 'placeholder',
}
def test_write_speed(session, max_size):
for i in range(WARMUP_ITERATIONS):
new_response = get_randomized_response(i, max_size)
session.cache.save_response(new_response)
with Progress() as progress:
task = progress.add_task('[cyan]Testing write speed...', total=ITERATIONS)
start = time()
for i in range(ITERATIONS):
new_response = get_randomized_response(i, max_size)
session.cache.save_response(new_response)
progress.update(task, advance=1)
elapsed = time() - start
avg = (elapsed / ITERATIONS) * 1000
print(f'[cyan]Elapsed: [green]{elapsed:.3f}[/] seconds (avg [green]{avg:.3f}[/] ms per write)')
def test_read_speed(session):
keys = list(session.cache.responses.keys())
for i in range(WARMUP_ITERATIONS):
key = keys[i % len(keys)]
session.cache.get_response(key)
with Progress() as progress:
task = progress.add_task('[cyan]Testing read speed...', total=ITERATIONS)
start = time()
for i in range(ITERATIONS):
key = keys[i % len(keys)]
session.cache.get_response(key)
progress.update(task, advance=1)
elapsed = time() - start
avg = (elapsed / ITERATIONS) * 1000
print(f'[cyan]Elapsed: [green]{elapsed:.3f}[/] seconds (avg [green]{avg:.3f}[/] ms per read)')
def get_randomized_response(i=0, max_size=MAX_RESPONSE_SIZE):
"""Get a response with randomized content"""
new_response = CachedResponse.from_response(BASE_RESPONSE)
n_bytes = int(random() * max_size)
new_response._content = urandom(n_bytes)
new_response.request.url += f'/response_{i}'
return new_response
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('-b', '--backend', default='sqlite')
parser.add_argument('-s', '--serializer', default='pickle')
parser.add_argument('-m', '--max-size', default=MAX_RESPONSE_SIZE, type=float)
args = parser.parse_args()
print(f'[cyan]Benchmarking {args.backend} backend with {args.serializer} serializer')
kwargs = {}
if args.backend == 'dynamodb':
kwargs = AWS_OPTIONS
elif args.backend == 'sqlite-memory':
args.backend = 'sqlite'
kwargs = {'use_memory': True}
session = CachedSession(
CACHE_NAME,
backend=args.backend,
serializer=args.serializer,
**kwargs,
)
test_write_speed(session, args.max_size)
test_read_speed(session)
|