summaryrefslogtreecommitdiff
path: root/tests/test_cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_cluster.py')
-rw-r--r--tests/test_cluster.py129
1 files changed, 129 insertions, 0 deletions
diff --git a/tests/test_cluster.py b/tests/test_cluster.py
index 58f9b77..1f037c9 100644
--- a/tests/test_cluster.py
+++ b/tests/test_cluster.py
@@ -1,9 +1,14 @@
import binascii
import datetime
+import select
+import socket
+import socketserver
+import threading
import warnings
from queue import LifoQueue, Queue
from time import sleep
from unittest.mock import DEFAULT, Mock, call, patch
+from urllib.parse import urlparse
import pytest
@@ -53,6 +58,85 @@ default_cluster_slots = [
]
+class ProxyRequestHandler(socketserver.BaseRequestHandler):
+ def recv(self, sock):
+ """A recv with a timeout"""
+ r = select.select([sock], [], [], 0.01)
+ if not r[0]:
+ return None
+ return sock.recv(1000)
+
+ def handle(self):
+ self.server.proxy.n_connections += 1
+ conn = socket.create_connection(self.server.proxy.redis_addr)
+ stop = False
+
+ def from_server():
+ # read from server and pass to client
+ while not stop:
+ data = self.recv(conn)
+ if data is None:
+ continue
+ if not data:
+ self.request.shutdown(socket.SHUT_WR)
+ return
+ self.request.sendall(data)
+
+ thread = threading.Thread(target=from_server)
+ thread.start()
+ try:
+ while True:
+ # read from client and send to server
+ data = self.request.recv(1000)
+ if not data:
+ return
+ conn.sendall(data)
+ finally:
+ conn.shutdown(socket.SHUT_WR)
+ stop = True # for safety
+ thread.join()
+ conn.close()
+
+
+class NodeProxy:
+ """A class to proxy a node connection to a different port"""
+
+ def __init__(self, addr, redis_addr):
+ self.addr = addr
+ self.redis_addr = redis_addr
+ self.server = socketserver.ThreadingTCPServer(self.addr, ProxyRequestHandler)
+ self.server.proxy = self
+ self.server.socket_reuse_address = True
+ self.thread = None
+ self.n_connections = 0
+
+ def start(self):
+ # test that we can connect to redis
+ s = socket.create_connection(self.redis_addr, timeout=2)
+ s.close()
+ # Start a thread with the server -- that thread will then start one
+ # more thread for each request
+ self.thread = threading.Thread(target=self.server.serve_forever)
+ # Exit the server thread when the main thread terminates
+ self.thread.daemon = True
+ self.thread.start()
+
+ def close(self):
+ self.server.shutdown()
+
+
+@pytest.fixture
+def redis_addr(request):
+ redis_url = request.config.getoption("--redis-url")
+ scheme, netloc = urlparse(redis_url)[:2]
+ assert scheme == "redis"
+ if ":" in netloc:
+ host, port = netloc.split(":")
+ return host, int(port)
+ else:
+ return netloc, 6379
+
+
@pytest.fixture()
def slowlog(request, r):
"""
@@ -823,6 +907,51 @@ class TestRedisClusterObj:
assert "myself" not in nodes.get(curr_default_node.name).get("flags")
assert r.get_default_node() != curr_default_node
+ def test_address_remap(self, request, redis_addr):
+ """Test that we can create a rediscluster object with
+ a host-port remapper and map connections through proxy objects
+ """
+
+ # we remap the first n nodes
+ offset = 1000
+ n = 6
+ ports = [redis_addr[1] + i for i in range(n)]
+
+ def address_remap(address):
+ # remap first three nodes to our local proxy
+ # old = host, port
+ host, port = address
+ if int(port) in ports:
+ host, port = "127.0.0.1", int(port) + offset
+ # print(f"{old} {host, port}")
+ return host, port
+
+ # create the proxies
+ proxies = [
+ NodeProxy(("127.0.0.1", port + offset), (redis_addr[0], port))
+ for port in ports
+ ]
+ for p in proxies:
+ p.start()
+ try:
+ # create cluster:
+ r = _get_client(
+ RedisCluster, request, flushdb=False, address_remap=address_remap
+ )
+ try:
+ assert r.ping() is True
+ assert r.set("byte_string", b"giraffe")
+ assert r.get("byte_string") == b"giraffe"
+ finally:
+ r.close()
+ finally:
+ for p in proxies:
+ p.close()
+
+ # verify that the proxies were indeed used
+ n_used = sum((1 if p.n_connections else 0) for p in proxies)
+ assert n_used > 1
+
@pytest.mark.onlycluster
class TestClusterRedisCommands: