summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSelwin Ong <selwin.ong@gmail.com>2021-08-24 18:52:02 +0700
committerSelwin Ong <selwin.ong@gmail.com>2021-08-24 18:52:02 +0700
commit2429ac84e56aa8fe2f9c8bbf2c8084964457f02e (patch)
tree1d366b4591d3523fcdae012708e52c839349a66e
parentb80045d615d539cf4c565c5541694efe13ea648e (diff)
downloadrq-fix-race-condition.tar.gz
Main worker should use zadd(xx=True) to update heartbeat.fix-race-condition
-rw-r--r--rq/job.py4
-rw-r--r--rq/registry.py6
-rw-r--r--rq/worker.py2
-rw-r--r--tests/test_cli.py8
4 files changed, 9 insertions, 11 deletions
diff --git a/rq/job.py b/rq/job.py
index ebb87c1..6d5c3bf 100644
--- a/rq/job.py
+++ b/rq/job.py
@@ -440,11 +440,11 @@ class Job:
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
- def heartbeat(self, timestamp, ttl, pipeline=None):
+ def heartbeat(self, timestamp, ttl, pipeline=None, xx=False):
self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
- self.started_job_registry.add(self, ttl, pipeline=pipeline)
+ self.started_job_registry.add(self, ttl, pipeline=pipeline, xx=xx)
id = property(get_id, set_id)
diff --git a/rq/registry.py b/rq/registry.py
index ac8bca7..ba1a1dd 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -61,15 +61,15 @@ class BaseRegistry:
self.cleanup()
return self.connection.zcard(self.key)
- def add(self, job, ttl=0, pipeline=None):
+ def add(self, job, ttl=0, pipeline=None, xx=False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1:
score = '+inf'
if pipeline is not None:
- return pipeline.zadd(self.key, {job.id: score})
+ return pipeline.zadd(self.key, {job.id: score}, xx=xx)
- return self.connection.zadd(self.key, {job.id: score})
+ return self.connection.zadd(self.key, {job.id: score}, xx=xx)
def remove(self, job, pipeline=None, delete_job=False):
"""Removes job from registry and deletes it if `delete_job == True`"""
diff --git a/rq/worker.py b/rq/worker.py
index 46ace25..e745eef 100644
--- a/rq/worker.py
+++ b/rq/worker.py
@@ -814,7 +814,7 @@ class Worker:
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job)
- job.heartbeat(utcnow(), ttl, pipeline=pipeline)
+ job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
pipeline.execute()
except OSError as e:
diff --git a/tests/test_cli.py b/tests/test_cli.py
index 72fc510..a03aaf6 100644
--- a/tests/test_cli.py
+++ b/tests/test_cli.py
@@ -449,12 +449,10 @@ class TestRQCli(RQTestCase):
prefix = 'Enqueued tests.fixtures.say_hello() with job-id \''
suffix = '\'.\n'
- print(result.stdout)
-
- self.assertTrue(result.stdout.startswith(prefix))
- self.assertTrue(result.stdout.endswith(suffix))
+ self.assertTrue(result.output.startswith(prefix))
+ self.assertTrue(result.output.endswith(suffix))
- job_id = result.stdout[len(prefix):-len(suffix)]
+ job_id = result.output[len(prefix):-len(suffix)]
queue_key = 'rq:queue:default'
self.assertEqual(self.connection.llen(queue_key), 1)
self.assertEqual(self.connection.lrange(queue_key, 0, -1)[0].decode('ascii'), job_id)