summaryrefslogtreecommitdiff
path: root/apscheduler/jobstores/redis_store.py
blob: 59a3a3bca8f9127918f25b1b15262f122a6c25e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""
Stores jobs in a Redis database.
"""
from uuid import uuid4
from datetime import datetime
import logging

from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job

try:
    import cPickle as pickle
except ImportError:  # pragma: nocover
    import pickle

try:
    from redis import StrictRedis
except ImportError:  # pragma: nocover
    raise ImportError('RedisJobStore requires redis installed')

try:
    long = long
except NameError:
    long = int

logger = logging.getLogger(__name__)


class RedisJobStore(JobStore):
    def __init__(self, db='apscheduler', key_prefix='jobs.', pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
        self.jobs = []
        self.pickle_protocol = pickle_protocol
        self.key_prefix = key_prefix

        if not db:
            raise ValueError('The "db" parameter must not be empty')
        if not key_prefix:
            raise ValueError('The "key_prefix" parameter must not be empty')

        self.redis = StrictRedis(db=db, **connect_args)

    def add_job(self, job):
        job.id = str(uuid4())
        job_state = job.__getstate__()
        job_dict = {
            'job_state': pickle.dumps(job_state, self.pickle_protocol),
            'runs': '0',
            'next_run_time': job_state.pop('next_run_time').isoformat()
        }
        self.redis.hmset(self.key_prefix + job.id, job_dict)
        self.jobs.append(job)

    def remove_job(self, job):
        self.redis.delete(self.key_prefix + job.id)
        self.jobs.remove(job)

    def load_jobs(self):
        jobs = []
        keys = self.redis.keys(self.key_prefix + '*')
        pipeline = self.redis.pipeline()
        for key in keys:
            pipeline.hgetall(key)
        results = pipeline.execute()

        for job_dict in results:
            job_state = {}
            try:
                job = Job.__new__(Job)
                job_state = pickle.loads(job_dict['job_state'.encode()])
                job_state['runs'] = long(job_dict['runs'.encode()])
                dateval = job_dict['next_run_time'.encode()].decode()
                job_state['next_run_time'] = datetime.strptime(dateval, '%Y-%m-%dT%H:%M:%S')
                job.__setstate__(job_state)
                jobs.append(job)
            except Exception:
                job_name = job_state.get('name', '(unknown)')
                logger.exception('Unable to restore job "%s"', job_name)
        self.jobs = jobs

    def update_job(self, job):
        attrs = {
            'next_run_time': job.next_run_time.isoformat(),
            'runs': job.runs
        }
        self.redis.hmset(self.key_prefix + job.id, attrs)

    def close(self):
        self.redis.connection_pool.disconnect()

    def __repr__(self):
        return '<%s>' % self.__class__.__name__