summaryrefslogtreecommitdiff
path: root/tests/test_connection.py
blob: fdfafbd014265078c468b73f590a7456dcd1cad5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from redis import Redis

from rq import Connection, Queue, use_connection, get_current_connection, pop_connection
from rq.connections import NoRedisConnectionException

from tests import find_empty_redis_database, RQTestCase
from tests.fixtures import do_nothing


def new_connection():
    return find_empty_redis_database()


class TestConnectionInheritance(RQTestCase):
    def test_connection_detection(self):
        """Automatic detection of the connection."""
        q = Queue()
        self.assertEqual(q.connection, self.testconn)

    def test_connection_stacking(self):
        """Connection stacking."""
        conn1 = Redis(db=4)
        conn2 = Redis(db=5)

        with Connection(conn1):
            q1 = Queue()
            with Connection(conn2):
                q2 = Queue()
        self.assertNotEqual(q1.connection, q2.connection)

    def test_connection_pass_thru(self):
        """Connection passed through from queues to jobs."""
        q1 = Queue()
        with Connection(new_connection()):
            q2 = Queue()
        job1 = q1.enqueue(do_nothing)
        job2 = q2.enqueue(do_nothing)
        self.assertEqual(q1.connection, job1.connection)
        self.assertEqual(q2.connection, job2.connection)


class TestConnectionHelpers(RQTestCase):
    def test_use_connection(self):
        """Test function use_connection works as expected."""
        conn = new_connection()
        use_connection(conn)

        self.assertEqual(conn, get_current_connection())

        use_connection()

        self.assertNotEqual(conn, get_current_connection())

        use_connection(self.testconn)  # Restore RQTestCase connection

        with self.assertRaises(AssertionError):
            with Connection(new_connection()):
                use_connection()
                with Connection(new_connection()):
                    use_connection()

    def test_resolve_connection_raises_on_no_connection(self):
        """Test function resolve_connection raises if there is no connection."""
        pop_connection()
        with self.assertRaises(NoRedisConnectionException):
            Queue()