summaryrefslogtreecommitdiff
path: root/benchmarks/cluster_async_pipeline.py
blob: af45b445118c7184d810cba5f499e48a7a843d9a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import asyncio
import functools
import time

import aioredis_cluster
import aredis
import uvloop

import redis.asyncio as redispy


def timer(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        tic = time.perf_counter()
        await func(*args, **kwargs)
        toc = time.perf_counter()
        return f"{toc - tic:.4f}"

    return wrapper


@timer
async def warmup(client):
    await asyncio.gather(
        *(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100))
    )


@timer
async def run(client):
    data_str = "a" * size
    data_int = int("1" * size)

    for i in range(count):
        with client.pipeline() as pipe:
            await (
                pipe.set(f"bench:str_{i}", data_str)
                .set(f"bench:int_{i}", data_int)
                .get(f"bench:str_{i}")
                .get(f"bench:int_{i}")
                .hset("bench:hset", str(i), data_str)
                .hget("bench:hset", str(i))
                .incr("bench:incr")
                .lpush("bench:lpush", data_int)
                .lrange("bench:lpush", 0, 300)
                .lpop("bench:lpush")
                .execute()
            )


async def main(loop):
    arc = aredis.StrictRedisCluster(
        host=host,
        port=port,
        password=password,
        max_connections=2**31,
        max_connections_per_node=2**31,
        readonly=False,
        reinitialize_steps=count,
        skip_full_coverage_check=True,
        decode_responses=False,
        max_idle_time=count,
        idle_check_interval=count,
    )
    print(f"{loop} {await warmup(arc)} aredis")
    print(await run(arc))
    arc.connection_pool.disconnect()

    aiorc = await aioredis_cluster.create_redis_cluster(
        [(host, port)],
        password=password,
        state_reload_interval=count,
        idle_connection_timeout=count,
        pool_maxsize=2**31,
    )
    print(f"{loop} {await warmup(aiorc)} aioredis-cluster")
    print(await run(aiorc))
    aiorc.close()
    await aiorc.wait_closed()

    async with redispy.RedisCluster(
        host=host,
        port=port,
        password=password,
        reinitialize_steps=count,
        read_from_replicas=False,
        decode_responses=False,
        max_connections=2**31,
    ) as rca:
        print(f"{loop} {await warmup(rca)} redispy")
        print(await run(rca))


if __name__ == "__main__":
    host = "localhost"
    port = 16379
    password = None

    count = 10000
    size = 256

    asyncio.run(main("asyncio"))

    uvloop.install()

    asyncio.run(main("uvloop"))