summaryrefslogtreecommitdiff
path: root/swift/common/daemon.py
blob: 773ca9424d3ef3c9eab5559f4984417761437f6f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import os
import sys
import time
import signal
from re import sub

import eventlet.debug
from eventlet.hubs import use_hub

from swift.common import utils


class Daemon(object):
    """
    Daemon base class

    A daemon has a run method that accepts a ``once`` kwarg and will dispatch
    to :meth:`run_once` or :meth:`run_forever`.

    A subclass of Daemon must implement :meth:`run_once` and
    :meth:`run_forever`.

    A subclass of Daemon may override :meth:`get_worker_args` to dispatch
    arguments to individual child process workers and :meth:`is_healthy` to
    perform context specific periodic wellness checks which can reset worker
    arguments.

    Implementations of Daemon do not know *how* to daemonize, or execute
    multiple daemonized workers, they simply provide the behavior of the daemon
    and context specific knowledge about how workers should be started.
    """
    WORKERS_HEALTHCHECK_INTERVAL = 5.0

    def __init__(self, conf):
        self.conf = conf
        self.logger = utils.get_logger(conf, log_route='daemon')

    def run_once(self, *args, **kwargs):
        """Override this to run the script once"""
        raise NotImplementedError('run_once not implemented')

    def run_forever(self, *args, **kwargs):
        """Override this to run forever"""
        raise NotImplementedError('run_forever not implemented')

    def run(self, once=False, **kwargs):
        if once:
            self.run_once(**kwargs)
        else:
            self.run_forever(**kwargs)

    def post_multiprocess_run(self):
        """
        Override this to do something after running using multiple worker
        processes. This method is called in the parent process.

        This is probably only useful for run-once mode since there is no
        "after running" in run-forever mode.
        """
        pass

    def get_worker_args(self, once=False, **kwargs):
        """
        For each worker yield a (possibly empty) dict of kwargs to pass along
        to the daemon's :meth:`run` method after fork.  The length of elements
        returned from this method will determine the number of processes
        created.

        If the returned iterable is empty, the Strategy will fallback to
        run-inline strategy.

        :param once: False if the worker(s) will be daemonized, True if the
            worker(s) will be run once
        :param kwargs: plumbed through via command line argparser

        :returns: an iterable of dicts, each element represents the kwargs to
                  be passed to a single worker's :meth:`run` method after fork.
        """
        return []

    def is_healthy(self):
        """
        This method is called very frequently on the instance of the daemon
        held by the parent process.  If it returns False, all child workers are
        terminated, and new workers will be created.

        :returns: a boolean, True only if all workers should continue to run
        """
        return True


class DaemonStrategy(object):
    """
    This is the execution strategy for using subclasses of Daemon.  The default
    behavior is to invoke the daemon's :meth:`Daemon.run` method from within
    the parent process.  When the :meth:`Daemon.run` method returns the parent
    process will exit.

    However, if the Daemon returns a non-empty iterable from
    :meth:`Daemon.get_worker_args`, the daemon's :meth:`Daemon.run` method will
    be invoked in child processes, with the arguments provided from the parent
    process's instance of the daemon.  If a child process exits it will be
    restarted with the same options, unless it was executed in once mode.

    :param daemon: an instance of a :class:`Daemon` (has a `run` method)
    :param logger: a logger instance
    """

    def __init__(self, daemon, logger):
        self.daemon = daemon
        self.logger = logger
        self.running = False
        # only used by multi-worker strategy
        self.options_by_pid = {}
        self.unspawned_worker_options = []

    def setup(self, **kwargs):
        utils.validate_configuration()
        utils.drop_privileges(self.daemon.conf.get('user', 'swift'))
        utils.clean_up_daemon_hygiene()
        utils.capture_stdio(self.logger, **kwargs)

        def kill_children(*args):
            self.running = False
            self.logger.notice('SIGTERM received (%s)', os.getpid())
            signal.signal(signal.SIGTERM, signal.SIG_IGN)
            os.killpg(0, signal.SIGTERM)
            os._exit(0)

        signal.signal(signal.SIGTERM, kill_children)
        self.running = True
        utils.systemd_notify(self.logger)

    def _run_inline(self, once=False, **kwargs):
        """Run the daemon"""
        self.daemon.run(once=once, **kwargs)

    def run(self, once=False, **kwargs):
        """Daemonize and execute our strategy"""
        self.setup(**kwargs)
        try:
            self._run(once=once, **kwargs)
        except KeyboardInterrupt:
            self.logger.notice('User quit')
        finally:
            self.cleanup()
        self.running = False

    def _fork(self, once, **kwargs):
        pid = os.fork()
        if pid == 0:
            signal.signal(signal.SIGHUP, signal.SIG_DFL)
            signal.signal(signal.SIGTERM, signal.SIG_DFL)

            self.daemon.run(once, **kwargs)

            self.logger.debug('Forked worker %s finished', os.getpid())
            # do not return from this stack, nor execute any finally blocks
            os._exit(0)
        else:
            self.register_worker_start(pid, kwargs)
        return pid

    def iter_unspawned_workers(self):
        while True:
            try:
                per_worker_options = self.unspawned_worker_options.pop()
            except IndexError:
                return
            yield per_worker_options

    def spawned_pids(self):
        return list(self.options_by_pid.keys())

    def register_worker_start(self, pid, per_worker_options):
        self.logger.debug('Spawned worker %s with %r', pid, per_worker_options)
        self.options_by_pid[pid] = per_worker_options

    def register_worker_exit(self, pid):
        self.unspawned_worker_options.append(self.options_by_pid.pop(pid))

    def ask_daemon_to_prepare_workers(self, once, **kwargs):
        self.unspawned_worker_options = list(
            self.daemon.get_worker_args(once=once, **kwargs))

    def abort_workers_if_daemon_would_like(self):
        if not self.daemon.is_healthy():
            self.logger.debug(
                'Daemon needs to change options, aborting workers')
            self.cleanup()
            return True
        return False

    def check_on_all_running_workers(self):
        for p in self.spawned_pids():
            try:
                pid, status = os.waitpid(p, os.WNOHANG)
            except OSError as err:
                if err.errno not in (errno.EINTR, errno.ECHILD):
                    raise
                self.logger.notice('Worker %s died', p)
            else:
                if pid == 0:
                    # child still running
                    continue
                self.logger.debug('Worker %s exited', p)
            self.register_worker_exit(p)

    def _run(self, once, **kwargs):
        self.ask_daemon_to_prepare_workers(once, **kwargs)
        if not self.unspawned_worker_options:
            return self._run_inline(once, **kwargs)
        for per_worker_options in self.iter_unspawned_workers():
            if self._fork(once, **per_worker_options) == 0:
                return 0
        while self.running:
            if self.abort_workers_if_daemon_would_like():
                self.ask_daemon_to_prepare_workers(once, **kwargs)
            self.check_on_all_running_workers()
            if not once:
                for per_worker_options in self.iter_unspawned_workers():
                    if self._fork(once, **per_worker_options) == 0:
                        return 0
            else:
                if not self.spawned_pids():
                    self.logger.notice('Finished %s', os.getpid())
                    break
            time.sleep(self.daemon.WORKERS_HEALTHCHECK_INTERVAL)
        self.daemon.post_multiprocess_run()
        return 0

    def cleanup(self):
        for p in self.spawned_pids():
            try:
                os.kill(p, signal.SIGTERM)
            except OSError as err:
                if err.errno not in (errno.ESRCH, errno.EINTR, errno.ECHILD):
                    raise
            self.register_worker_exit(p)
            self.logger.debug('Cleaned up worker %s', p)


def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
    """
    Loads settings from conf, then instantiates daemon ``klass`` and runs the
    daemon with the specified ``once`` kwarg.  The section_name will be derived
    from the daemon ``klass`` if not provided (e.g. ObjectReplicator =>
    object-replicator).

    :param klass: Class to instantiate, subclass of :class:`Daemon`
    :param conf_file: Path to configuration file
    :param section_name: Section name from conf file to load config from
    :param once: Passed to daemon :meth:`Daemon.run` method
    """
    # very often the config section_name is based on the class name
    # the None singleton will be passed through to readconf as is
    if section_name == '':
        section_name = sub(r'([a-z])([A-Z])', r'\1-\2',
                           klass.__name__).lower()
    try:
        conf = utils.readconf(conf_file, section_name,
                              log_name=kwargs.get('log_name'))
    except (ValueError, IOError) as e:
        # The message will be printed to stderr
        # and results in an exit code of 1.
        sys.exit(e)

    use_hub(utils.get_hub())

    # once on command line (i.e. daemonize=false) will over-ride config
    once = once or not utils.config_true_value(conf.get('daemonize', 'true'))

    # pre-configure logger
    if 'logger' in kwargs:
        logger = kwargs.pop('logger')
    else:
        logger = utils.get_logger(conf, conf.get('log_name', section_name),
                                  log_to_console=kwargs.pop('verbose', False),
                                  log_route=section_name)

    # optional nice/ionice priority scheduling
    utils.modify_priority(conf, logger)

    # disable fallocate if desired
    if utils.config_true_value(conf.get('disable_fallocate', 'no')):
        utils.disable_fallocate()
    # set utils.FALLOCATE_RESERVE if desired
    utils.FALLOCATE_RESERVE, utils.FALLOCATE_IS_PERCENT = \
        utils.config_fallocate_value(conf.get('fallocate_reserve', '1%'))

    # By default, disable eventlet printing stacktraces
    eventlet_debug = utils.config_true_value(conf.get('eventlet_debug', 'no'))
    eventlet.debug.hub_exceptions(eventlet_debug)

    # Ensure TZ environment variable exists to avoid stat('/etc/localtime') on
    # some platforms. This locks in reported times to UTC.
    os.environ['TZ'] = 'UTC+0'
    time.tzset()

    logger.notice('Starting %s', os.getpid())
    try:
        d = klass(conf)
        DaemonStrategy(d, logger).run(once=once, **kwargs)
    except KeyboardInterrupt:
        logger.info('User quit')
    logger.notice('Exited %s', os.getpid())
    return d