summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikita Romaniuk <nikkonrom@gmail.com>2020-10-20 02:57:03 +0300
committerGitHub <noreply@github.com>2020-10-20 06:57:03 +0700
commit2da957a68da218db0ecc75786d66c2128cff400e (patch)
treea12ffb74b5990b7ddef1851dec9ad3b8b55ce4f0
parent9adcd7e50c511ceba622ba20c826a0b701a917ea (diff)
downloadrq-2da957a68da218db0ecc75786d66c2128cff400e.tar.gz
scheduler: now operates with chunks of jobs (#1355)
* scheduler: now operates with chunks of jobs * scheduler: set default chunk_size for ScheduledJobRegistry.get_jobs_to_schedule * scheduler: fixed missing indent * scheduler: added test for get_jobs_to_schedule() with chunk_size parameter * scheduler: fixed test for passing python 3.5 (no f-strings) * scheduler: fixed chunk_size in test make it lighter to run
-rw-r--r--rq/registry.py4
-rw-r--r--rq/scheduler.py11
-rw-r--r--tests/test_scheduler.py20
3 files changed, 24 insertions, 11 deletions
diff --git a/rq/registry.py b/rq/registry.py
index 579d0c5..5a5c078 100644
--- a/rq/registry.py
+++ b/rq/registry.py
@@ -286,11 +286,11 @@ class ScheduledJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
- def get_jobs_to_schedule(self, timestamp=None):
+ def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000):
"""Remove jobs whose timestamp is in the past from registry."""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
- self.connection.zrangebyscore(self.key, 0, score)]
+ self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)]
def get_scheduled_time(self, job_or_id):
"""Returns datetime (UTC) at which job is scheduled to be enqueued"""
diff --git a/rq/scheduler.py b/rq/scheduler.py
index b55f728..4d84b48 100644
--- a/rq/scheduler.py
+++ b/rq/scheduler.py
@@ -28,7 +28,6 @@ setup_loghandlers(
class RQScheduler(object):
-
# STARTED: scheduler has been started but sleeping
# WORKING: scheduler is in the midst of scheduling jobs
# STOPPED: scheduler is in stopped condition
@@ -137,11 +136,11 @@ class RQScheduler(object):
queue = Queue(registry.name, connection=self.connection)
with self.connection.pipeline() as pipeline:
- # This should be done in bulk
- for job_id in job_ids:
- job = Job.fetch(job_id, connection=self.connection)
- queue.enqueue_job(job, pipeline=pipeline)
- registry.remove_jobs(timestamp)
+ jobs = Job.fetch_many(job_ids, connection=self.connection)
+ for job in jobs:
+ if job is not None:
+ queue.enqueue_job(job, pipeline=pipeline)
+ registry.remove(job, pipeline=pipeline)
pipeline.execute()
self._status = self.Status.STARTED
diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py
index 480e993..13c68e2 100644
--- a/tests/test_scheduler.py
+++ b/tests/test_scheduler.py
@@ -35,6 +35,21 @@ class TestScheduledJobRegistry(RQTestCase):
self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20),
['foo', 'bar'])
+ def test_get_jobs_to_schedule_with_chunk_size(self):
+ """Max amount of jobs returns by get_jobs_to_schedule() equal to chunk_size"""
+ queue = Queue(connection=self.testconn)
+ registry = ScheduledJobRegistry(queue=queue)
+ timestamp = current_timestamp()
+ chunk_size = 5
+
+ for index in range(0, chunk_size * 2):
+ self.testconn.zadd(registry.key, {'foo_{}'.format(index): 1})
+
+ self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size)),
+ chunk_size)
+ self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size * 2)),
+ chunk_size * 2)
+
def test_get_scheduled_time(self):
"""get_scheduled_time() returns job's scheduled datetime"""
queue = Queue(connection=self.testconn)
@@ -87,7 +102,7 @@ class TestScheduledJobRegistry(RQTestCase):
with mock_tz, mock_day, mock_atz:
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
- 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
+ 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp
# second, time.daylight != 0 (in DST)
# mock the sitatuoin for American/New_York not in DST (UTC - 4)
@@ -100,8 +115,7 @@ class TestScheduledJobRegistry(RQTestCase):
with mock_tz, mock_day, mock_atz:
registry.schedule(job, datetime(2019, 1, 1))
self.assertEqual(self.testconn.zscore(registry.key, job.id),
- 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp
-
+ 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp
# Score is always stored in UTC even if datetime is in a different tz
tz = timezone(timedelta(hours=7))