diff options
author | Nikita Romaniuk <nikkonrom@gmail.com> | 2020-10-20 02:57:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-20 06:57:03 +0700 |
commit | 2da957a68da218db0ecc75786d66c2128cff400e (patch) | |
tree | a12ffb74b5990b7ddef1851dec9ad3b8b55ce4f0 | |
parent | 9adcd7e50c511ceba622ba20c826a0b701a917ea (diff) | |
download | rq-2da957a68da218db0ecc75786d66c2128cff400e.tar.gz |
scheduler: now operates with chunks of jobs (#1355)
* scheduler: now operates with chunks of jobs
* scheduler: set default chunk_size for ScheduledJobRegistry.get_jobs_to_schedule
* scheduler: fixed missing indent
* scheduler: added test for get_jobs_to_schedule() with chunk_size parameter
* scheduler: fixed test for passing python 3.5 (no f-strings)
* scheduler: fixed chunk_size in test make it lighter to run
-rw-r--r-- | rq/registry.py | 4 | ||||
-rw-r--r-- | rq/scheduler.py | 11 | ||||
-rw-r--r-- | tests/test_scheduler.py | 20 |
3 files changed, 24 insertions, 11 deletions
diff --git a/rq/registry.py b/rq/registry.py index 579d0c5..5a5c078 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -286,11 +286,11 @@ class ScheduledJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() return connection.zremrangebyscore(self.key, 0, score) - def get_jobs_to_schedule(self, timestamp=None): + def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000): """Remove jobs whose timestamp is in the past from registry.""" score = timestamp if timestamp is not None else current_timestamp() return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, score)] + self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)] def get_scheduled_time(self, job_or_id): """Returns datetime (UTC) at which job is scheduled to be enqueued""" diff --git a/rq/scheduler.py b/rq/scheduler.py index b55f728..4d84b48 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -28,7 +28,6 @@ setup_loghandlers( class RQScheduler(object): - # STARTED: scheduler has been started but sleeping # WORKING: scheduler is in the midst of scheduling jobs # STOPPED: scheduler is in stopped condition @@ -137,11 +136,11 @@ class RQScheduler(object): queue = Queue(registry.name, connection=self.connection) with self.connection.pipeline() as pipeline: - # This should be done in bulk - for job_id in job_ids: - job = Job.fetch(job_id, connection=self.connection) - queue.enqueue_job(job, pipeline=pipeline) - registry.remove_jobs(timestamp) + jobs = Job.fetch_many(job_ids, connection=self.connection) + for job in jobs: + if job is not None: + queue.enqueue_job(job, pipeline=pipeline) + registry.remove(job, pipeline=pipeline) pipeline.execute() self._status = self.Status.STARTED diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 480e993..13c68e2 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -35,6 +35,21 @@ class TestScheduledJobRegistry(RQTestCase): self.assertEqual(registry.get_jobs_to_enqueue(timestamp + 20), ['foo', 'bar']) + def test_get_jobs_to_schedule_with_chunk_size(self): + """Max amount of jobs returns by get_jobs_to_schedule() equal to chunk_size""" + queue = Queue(connection=self.testconn) + registry = ScheduledJobRegistry(queue=queue) + timestamp = current_timestamp() + chunk_size = 5 + + for index in range(0, chunk_size * 2): + self.testconn.zadd(registry.key, {'foo_{}'.format(index): 1}) + + self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size)), + chunk_size) + self.assertEqual(len(registry.get_jobs_to_schedule(timestamp, chunk_size * 2)), + chunk_size * 2) + def test_get_scheduled_time(self): """get_scheduled_time() returns job's scheduled datetime""" queue = Queue(connection=self.testconn) @@ -87,7 +102,7 @@ class TestScheduledJobRegistry(RQTestCase): with mock_tz, mock_day, mock_atz: registry.schedule(job, datetime(2019, 1, 1)) self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp + 1546300800 + 18000) # 2019-01-01 UTC in Unix timestamp # second, time.daylight != 0 (in DST) # mock the sitatuoin for American/New_York not in DST (UTC - 4) @@ -100,8 +115,7 @@ class TestScheduledJobRegistry(RQTestCase): with mock_tz, mock_day, mock_atz: registry.schedule(job, datetime(2019, 1, 1)) self.assertEqual(self.testconn.zscore(registry.key, job.id), - 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp - + 1546300800 + 14400) # 2019-01-01 UTC in Unix timestamp # Score is always stored in UTC even if datetime is in a different tz tz = timezone(timedelta(hours=7)) |