diff options
| author | Utkarsh Gupta <utkarshgupta137@gmail.com> | 2022-05-08 17:34:20 +0530 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-05-08 15:04:20 +0300 |
| commit | 061d97abe21d3a8ce9738330cabf771dd05c8dc1 (patch) | |
| tree | e64d64c5917312a65304f4d630775d08adb38bfa /benchmarks | |
| parent | c25be04d6468163d31908774ed358d3fd6bc0a39 (diff) | |
| download | redis-py-061d97abe21d3a8ce9738330cabf771dd05c8dc1.tar.gz | |
Add Async RedisCluster (#2099)
* Copy Cluster Client, Commands, Commands Parser, Tests for asyncio
* Async Cluster Tests: Async/Await
* Add Async RedisCluster
* cluster: use ERRORS_ALLOW_RETRY from self.__class__
* async_cluster: rework redis_connection, initialize, & close
- move redis_connection from NodesManager to ClusterNode & handle all related logic in ClusterNode class
- use Locks while initializing or closing
- in case of error, close connections instead of instantly reinitializing
- create ResourceWarning instead of manually deleting client object
- use asyncio.gather to run commands/initialize/close in parallel
- inline single use functions
- fix test_acl_log for py3.6
* async_cluster: add types
* async_cluster: add docs
* docs: update sphinx & add sphinx_autodoc_typehints
* async_cluster: move TargetNodesT to cluster module
* async_cluster/commands: inherit commands from sync class if possible
* async_cluster: add benchmark script with aredis & aioredis-cluster
* async_cluster: remove logging
* async_cluster: inline functions
* async_cluster: manage Connection instead of Redis Client
* async_cluster/commands: optimize parser
* async_cluster: use ensure_future & generators for gather
* async_conn: optimize
* async_cluster: optimize determine_slot
* async_cluster: optimize determine_nodes
* async_cluster/parser: optimize _get_moveable_keys
* async_cluster: inlined check_slots_coverage
* async_cluster: update docstrings
* async_cluster: add concurrent test & use read_response/_update_moved_slots without lock
Co-authored-by: Chayim <chayim@users.noreply.github.com>
Diffstat (limited to 'benchmarks')
| -rw-r--r-- | benchmarks/cluster_async.py | 263 |
1 files changed, 263 insertions, 0 deletions
diff --git a/benchmarks/cluster_async.py b/benchmarks/cluster_async.py new file mode 100644 index 0000000..aec3f1c --- /dev/null +++ b/benchmarks/cluster_async.py @@ -0,0 +1,263 @@ +import asyncio +import functools +import time + +import aioredis_cluster +import aredis +import uvloop + +import redis.asyncio as redispy + + +def timer(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): + tic = time.perf_counter() + await func(*args, **kwargs) + toc = time.perf_counter() + return f"{toc - tic:.4f}" + + return wrapper + + +@timer +async def set_str(client, gather, data): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.set(f"bench:str_{i}", data)) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.set(f"bench:str_{i}", data) + + +@timer +async def set_int(client, gather, data): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.set(f"bench:int_{i}", data)) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.set(f"bench:int_{i}", data) + + +@timer +async def get_str(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *(asyncio.create_task(client.get(f"bench:str_{i}")) for i in range(100)) + ) + else: + for i in range(count): + await client.get(f"bench:str_{i}") + + +@timer +async def get_int(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *(asyncio.create_task(client.get(f"bench:int_{i}")) for i in range(100)) + ) + else: + for i in range(count): + await client.get(f"bench:int_{i}") + + +@timer +async def hset(client, gather, data): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.hset("bench:hset", str(i), data)) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.hset("bench:hset", str(i), data) + + +@timer +async def hget(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.hget("bench:hset", str(i))) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.hget("bench:hset", str(i)) + + +@timer +async def incr(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *(asyncio.create_task(client.incr("bench:incr")) for i in range(100)) + ) + else: + for i in range(count): + await client.incr("bench:incr") + + +@timer +async def lpush(client, gather, data): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.lpush("bench:lpush", data)) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.lpush("bench:lpush", data) + + +@timer +async def lrange_300(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *( + asyncio.create_task(client.lrange("bench:lpush", i, i + 300)) + for i in range(100) + ) + ) + else: + for i in range(count): + await client.lrange("bench:lpush", i, i + 300) + + +@timer +async def lpop(client, gather): + if gather: + for _ in range(count // 100): + await asyncio.gather( + *(asyncio.create_task(client.lpop("bench:lpush")) for i in range(100)) + ) + else: + for i in range(count): + await client.lpop("bench:lpush") + + +@timer +async def warmup(client): + await asyncio.gather( + *(asyncio.create_task(client.exists(f"bench:warmup_{i}")) for i in range(100)) + ) + + +@timer +async def run(client, gather): + data_str = "a" * size + data_int = int("1" * size) + + if gather is False: + for ret in await asyncio.gather( + asyncio.create_task(set_str(client, gather, data_str)), + asyncio.create_task(set_int(client, gather, data_int)), + asyncio.create_task(hset(client, gather, data_str)), + asyncio.create_task(incr(client, gather)), + asyncio.create_task(lpush(client, gather, data_int)), + ): + print(ret) + for ret in await asyncio.gather( + asyncio.create_task(get_str(client, gather)), + asyncio.create_task(get_int(client, gather)), + asyncio.create_task(hget(client, gather)), + asyncio.create_task(lrange_300(client, gather)), + asyncio.create_task(lpop(client, gather)), + ): + print(ret) + else: + print(await set_str(client, gather, data_str)) + print(await set_int(client, gather, data_int)) + print(await hset(client, gather, data_str)) + print(await incr(client, gather)) + print(await lpush(client, gather, data_int)) + + print(await get_str(client, gather)) + print(await get_int(client, gather)) + print(await hget(client, gather)) + print(await lrange_300(client, gather)) + print(await lpop(client, gather)) + + +async def main(loop, gather=None): + arc = aredis.StrictRedisCluster( + host=host, + port=port, + password=password, + max_connections=2 ** 31, + max_connections_per_node=2 ** 31, + readonly=False, + reinitialize_steps=count, + skip_full_coverage_check=True, + decode_responses=False, + max_idle_time=count, + idle_check_interval=count, + ) + print(f"{loop} {gather} {await warmup(arc)} aredis") + print(await run(arc, gather=gather)) + arc.connection_pool.disconnect() + + aiorc = await aioredis_cluster.create_redis_cluster( + [(host, port)], + password=password, + state_reload_interval=count, + idle_connection_timeout=count, + pool_maxsize=2 ** 31, + ) + print(f"{loop} {gather} {await warmup(aiorc)} aioredis-cluster") + print(await run(aiorc, gather=gather)) + aiorc.close() + await aiorc.wait_closed() + + async with redispy.RedisCluster( + host=host, + port=port, + password=password, + reinitialize_steps=count, + read_from_replicas=False, + decode_responses=False, + max_connections=2 ** 31, + ) as rca: + print(f"{loop} {gather} {await warmup(rca)} redispy") + print(await run(rca, gather=gather)) + + +if __name__ == "__main__": + host = "localhost" + port = 16379 + password = None + + count = 1000 + size = 16 + + asyncio.run(main("asyncio")) + asyncio.run(main("asyncio", gather=False)) + asyncio.run(main("asyncio", gather=True)) + + uvloop.install() + + asyncio.run(main("uvloop")) + asyncio.run(main("uvloop", gather=False)) + asyncio.run(main("uvloop", gather=True)) |
