summaryrefslogtreecommitdiff
path: root/tests/test_results.py
blob: 4286cec681313509a764d4ed2ae7ffe45657c055 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import unittest
import tempfile

from datetime import timedelta
from unittest.mock import patch, PropertyMock

from redis import Redis

from tests import RQTestCase

from rq.defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD
from rq.job import Job
from rq.queue import Queue
from rq.registry import StartedJobRegistry
from rq.results import Result, get_key
from rq.utils import get_version, utcnow
from rq.worker import Worker

from .fixtures import say_hello, div_by_zero


@unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0')
class TestScheduledJobRegistry(RQTestCase):

    def test_save_and_get_result(self):
        """Ensure data is saved properly"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)

        result = Result.fetch_latest(job)
        self.assertIsNone(result)

        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        result = Result.fetch_latest(job)
        self.assertEqual(result.return_value, 1)
        self.assertEqual(job.latest_result().return_value, 1)

        # Check that ttl is properly set
        key = get_key(job.id)
        ttl = self.connection.pttl(key)
        self.assertTrue(5000 < ttl <= 10000)

        # Check job with None return value
        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=None)
        result = Result.fetch_latest(job)
        self.assertIsNone(result.return_value)
        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=2)
        result = Result.fetch_latest(job)
        self.assertEqual(result.return_value, 2)

    def test_create_failure(self):
        """Ensure data is saved properly"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)
        Result.create_failure(job, ttl=10, exc_string='exception')
        result = Result.fetch_latest(job)
        self.assertEqual(result.exc_string, 'exception')

        # Check that ttl is properly set
        key = get_key(job.id)
        ttl = self.connection.pttl(key)
        self.assertTrue(5000 < ttl <= 10000)

    def test_getting_results(self):
        """Check getting all execution results"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)

        # latest_result() returns None when there's no result
        self.assertIsNone(job.latest_result())

        result_1 = Result.create_failure(job, ttl=10, exc_string='exception')
        result_2 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        result_3 = Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)

        # Result.fetch_latest() returns the latest result
        result = Result.fetch_latest(job)
        self.assertEqual(result, result_3)
        self.assertEqual(job.latest_result(), result_3)

        # Result.all() and job.results() returns all results, newest first
        results = Result.all(job)
        self.assertEqual(results, [result_3, result_2, result_1])
        self.assertEqual(job.results(), [result_3, result_2, result_1])

    def test_count(self):
        """Result.count(job) returns number of results"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)
        self.assertEqual(Result.count(job), 0)
        Result.create_failure(job, ttl=10, exc_string='exception')
        self.assertEqual(Result.count(job), 1)
        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        self.assertEqual(Result.count(job), 2)

    def test_delete_all(self):
        """Result.delete_all(job) deletes all results from Redis"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)
        Result.create_failure(job, ttl=10, exc_string='exception')
        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        Result.delete_all(job)
        self.assertEqual(Result.count(job), 0)

    def test_job_successful_result_fallback(self):
        """Changes to job.result handling should be backwards compatible."""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)
        worker = Worker([queue])
        worker.register_birth()

        self.assertEqual(worker.failed_job_count, 0)
        self.assertEqual(worker.successful_job_count, 0)
        self.assertEqual(worker.total_working_time, 0)

        # These should only run on workers that supports Redis streams
        registry = StartedJobRegistry(connection=self.connection)
        job.started_at = utcnow()
        job.ended_at = job.started_at + timedelta(seconds=0.75)
        job._result = 'Success'
        worker.handle_job_success(job, queue, registry)

        payload = self.connection.hgetall(job.key)
        self.assertFalse(b'result' in payload.keys())
        self.assertEqual(job.result, 'Success')

        with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
            with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
                job_mock.return_value = False
                mock.return_value = False
                worker = Worker([queue])
                worker.register_birth()
                job = queue.enqueue(say_hello)
                job._result = 'Success'
                job.started_at = utcnow()
                job.ended_at = job.started_at + timedelta(seconds=0.75)

                # If `save_result_to_job` = True, result will be saved to job
                # hash, simulating older versions of RQ

                worker.handle_job_success(job, queue, registry)
                payload = self.connection.hgetall(job.key)
                self.assertTrue(b'result' in payload.keys())
                # Delete all new result objects so we only have result stored in job hash,
                # this should simulate a job that was executed in an earlier RQ version
                self.assertEqual(job.result, 'Success')

    def test_job_failed_result_fallback(self):
        """Changes to job.result failure handling should be backwards compatible."""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)
        worker = Worker([queue])
        worker.register_birth()

        self.assertEqual(worker.failed_job_count, 0)
        self.assertEqual(worker.successful_job_count, 0)
        self.assertEqual(worker.total_working_time, 0)

        registry = StartedJobRegistry(connection=self.connection)
        job.started_at = utcnow()
        job.ended_at = job.started_at + timedelta(seconds=0.75)
        worker.handle_job_failure(job, exc_string='Error', queue=queue,
                                  started_job_registry=registry)

        job = Job.fetch(job.id, connection=self.connection)
        payload = self.connection.hgetall(job.key)
        self.assertFalse(b'exc_info' in payload.keys())
        self.assertEqual(job.exc_info, 'Error')

        with patch('rq.worker.Worker.supports_redis_streams', new_callable=PropertyMock) as mock:
            with patch('rq.job.Job.supports_redis_streams', new_callable=PropertyMock) as job_mock:
                job_mock.return_value = False
                mock.return_value = False
                worker = Worker([queue])
                worker.register_birth()

                job = queue.enqueue(say_hello)
                job.started_at = utcnow()
                job.ended_at = job.started_at + timedelta(seconds=0.75)

                # If `save_result_to_job` = True, result will be saved to job
                # hash, simulating older versions of RQ

                worker.handle_job_failure(job, exc_string='Error', queue=queue,
                                          started_job_registry=registry)
                payload = self.connection.hgetall(job.key)
                self.assertTrue(b'exc_info' in payload.keys())
                # Delete all new result objects so we only have result stored in job hash,
                # this should simulate a job that was executed in an earlier RQ version
                Result.delete_all(job)
                job = Job.fetch(job.id, connection=self.connection)
                self.assertEqual(job.exc_info, 'Error')

    def test_job_return_value(self):
        """Test job.return_value"""
        queue = Queue(connection=self.connection)
        job = queue.enqueue(say_hello)

        # Returns None when there's no result
        self.assertIsNone(job.return_value())

        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        self.assertEqual(job.return_value(), 1)

        # Returns None if latest result is a failure
        Result.create_failure(job, ttl=10, exc_string='exception')
        self.assertIsNone(job.return_value(refresh=True))

    def test_job_return_value_sync(self):
        """Test job.return_value when queue.is_async=False"""
        queue = Queue(connection=self.connection, is_async=False)
        job = queue.enqueue(say_hello)

        # Returns None when there's no result
        self.assertIsNotNone(job.return_value())

        job = queue.enqueue(div_by_zero)
        self.assertEqual(job.latest_result().type, Result.Type.FAILED)

    def test_job_return_value_result_ttl_infinity(self):
        """Test job.return_value when queue.result_ttl=-1"""
        queue = Queue(connection=self.connection, result_ttl=-1)
        job = queue.enqueue(say_hello)

        # Returns None when there's no result
        self.assertIsNone(job.return_value())

        Result.create(job, Result.Type.SUCCESSFUL, ttl=-1, return_value=1)
        self.assertEqual(job.return_value(), 1)

    def test_job_return_value_result_ttl_zero(self):
        """Test job.return_value when queue.result_ttl=0"""
        queue = Queue(connection=self.connection, result_ttl=0)
        job = queue.enqueue(say_hello)

        # Returns None when there's no result
        self.assertIsNone(job.return_value())

        Result.create(job, Result.Type.SUCCESSFUL, ttl=0, return_value=1)
        self.assertIsNone(job.return_value())

    def test_job_return_value_unserializable(self):
        """Test job.return_value when it is not serializable"""
        queue = Queue(connection=self.connection, result_ttl=0)
        job = queue.enqueue(say_hello)

        # Returns None when there's no result
        self.assertIsNone(job.return_value())

        # tempfile.NamedTemporaryFile() is not picklable
        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=tempfile.NamedTemporaryFile())
        self.assertEqual(job.return_value(), UNSERIALIZABLE_RETURN_VALUE_PAYLOAD)
        self.assertEqual(Result.count(job), 1)

        Result.create(job, Result.Type.SUCCESSFUL, ttl=10, return_value=1)
        self.assertEqual(Result.count(job), 2)