summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZuul <zuul@review.openstack.org>2019-03-14 15:25:08 +0000
committerGerrit Code Review <review@openstack.org>2019-03-14 15:25:08 +0000
commitac15dc608abb3efa69ce8ad49ff069e21b17b621 (patch)
tree7a92eae32149939ca2299277ec034bdbf6d6d308
parent9a80f7ab86a1fa9d9e77383bf6b305889fa8245c (diff)
parentf0dc2454dadeba04a17263d64574d786cdfcbc17 (diff)
downloadglance-ac15dc608abb3efa69ce8ad49ff069e21b17b621.tar.gz
Merge "Windows multiprocess wsgi"
-rw-r--r--glance/cmd/api.py1
-rw-r--r--glance/cmd/registry.py1
-rw-r--r--glance/common/wsgi.py417
3 files changed, 293 insertions, 126 deletions
diff --git a/glance/cmd/api.py b/glance/cmd/api.py
index 4c35b21bf..1acd3bba8 100644
--- a/glance/cmd/api.py
+++ b/glance/cmd/api.py
@@ -62,6 +62,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
+wsgi.register_cli_opts()
# NOTE(rosmaita): Any new exceptions added should preserve the current
# error codes for backward compatibility. The value 99 is returned
diff --git a/glance/cmd/registry.py b/glance/cmd/registry.py
index c222e81e9..85a13665e 100644
--- a/glance/cmd/registry.py
+++ b/glance/cmd/registry.py
@@ -60,6 +60,7 @@ from glance import notifier
CONF = cfg.CONF
CONF.import_group("profiler", "glance.common.wsgi")
logging.register_options(CONF)
+wsgi.register_cli_opts()
def main():
diff --git a/glance/common/wsgi.py b/glance/common/wsgi.py
index dae1e2e5b..103a09b76 100644
--- a/glance/common/wsgi.py
+++ b/glance/common/wsgi.py
@@ -21,10 +21,14 @@ Utility methods for working with WSGI servers
"""
from __future__ import print_function
+import abc
import errno
import functools
import os
+import re
import signal
+import struct
+import subprocess
import sys
import time
@@ -33,6 +37,7 @@ from eventlet.green import ssl
import eventlet.greenio
import eventlet.wsgi
import glance_store
+from os_win import utilsfactory as os_win_utilsfactory
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
@@ -318,6 +323,12 @@ store_opts = [
'using comma.')),
]
+cli_opts = [
+ cfg.StrOpt('pipe-handle',
+ help='This argument is used internally on Windows. Glance '
+ 'passes a pipe handle to child processes, which is then '
+ 'used for inter-process communication.'),
+]
LOG = logging.getLogger(__name__)
@@ -340,8 +351,17 @@ except ImportError:
uwsgi = None
+def register_cli_opts():
+ CONF.register_cli_opts(cli_opts)
+
+
def get_num_workers():
"""Return the configured number of workers."""
+
+ # Windows only: we're already running on the worker side.
+ if os.name == 'nt' and getattr(CONF, 'pipe_handle', None):
+ return 0
+
if CONF.workers is None:
# None implies the number of CPUs limited to 8
# See Launchpad bug #1748916 and the config help text
@@ -475,7 +495,8 @@ def get_asynchronous_eventlet_pool(size=1000):
return pool
-class Server(object):
+@six.add_metaclass(abc.ABCMeta)
+class BaseServer(object):
"""Server class to manage multiple WSGI sockets and applications.
This class requires initialize_glance_store set to True if
@@ -491,24 +512,6 @@ class Server(object):
# NOTE(abhishek): Allows us to only re-initialize glance_store when
# the API's configuration reloads.
self.initialize_glance_store = initialize_glance_store
- self.pgid = os.getpid()
- try:
- # NOTE(flaper87): Make sure this process
- # runs in its own process group.
- # NOTE(lpetrut): This isn't available on Windows, so we're going
- # to use job objects instead.
- os.setpgid(self.pgid, self.pgid)
- except (OSError, AttributeError):
- # NOTE(flaper87): When running glance-control,
- # (glance's functional tests, for example)
- # setpgid fails with EPERM as glance-control
- # creates a fresh session, of which the newly
- # launched service becomes the leader (session
- # leaders may not change process groups)
- #
- # Running glance-(api|registry) is safe and
- # shouldn't raise any error here.
- self.pgid = 0
@staticmethod
def set_signal_handler(signal_name, handler):
@@ -524,13 +527,20 @@ class Server(object):
self.set_signal_handler("SIGHUP", signal.SIG_IGN)
raise exception.SIGHUPInterrupt
+ @abc.abstractmethod
def kill_children(self, *args):
- """Kills the entire process group."""
- self.set_signal_handler("SIGTERM", signal.SIG_IGN)
- self.set_signal_handler("SIGINT", signal.SIG_IGN)
- self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
- self.running = False
- os.killpg(self.pgid, signal.SIGTERM)
+ pass
+
+ @abc.abstractmethod
+ def wait_on_children(self):
+ pass
+
+ @abc.abstractmethod
+ def run_child(self):
+ pass
+
+ def reload(self):
+ raise NotImplementedError()
def start(self, application, default_port):
"""
@@ -562,50 +572,6 @@ class Server(object):
def create_pool(self):
return get_asynchronous_eventlet_pool(size=self.threads)
- def _remove_children(self, pid):
- if pid in self.children:
- self.children.remove(pid)
- LOG.info(_LI('Removed dead child %s'), pid)
- elif pid in self.stale_children:
- self.stale_children.remove(pid)
- LOG.info(_LI('Removed stale child %s'), pid)
- else:
- LOG.warn(_LW('Unrecognised child %s') % pid)
-
- def _verify_and_respawn_children(self, pid, status):
- if len(self.stale_children) == 0:
- LOG.debug('No stale children')
- if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
- LOG.error(_LE('Not respawning child %d, cannot '
- 'recover from termination') % pid)
- if not self.children and not self.stale_children:
- LOG.info(
- _LI('All workers have terminated. Exiting'))
- self.running = False
- else:
- if len(self.children) < get_num_workers():
- self.run_child()
-
- def wait_on_children(self):
- while self.running:
- try:
- pid, status = os.wait()
- if os.WIFEXITED(status) or os.WIFSIGNALED(status):
- self._remove_children(pid)
- self._verify_and_respawn_children(pid, status)
- except OSError as err:
- if err.errno not in (errno.EINTR, errno.ECHILD):
- raise
- except KeyboardInterrupt:
- LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
- break
- except exception.SIGHUPInterrupt:
- self.reload()
- continue
- eventlet.greenio.shutdown_safe(self.sock)
- self.sock.close()
- LOG.debug('Exited')
-
def configure(self, old_conf=None, has_changed=None):
"""
Apply configuration settings
@@ -622,35 +588,6 @@ class Server(object):
else:
initialize_glance_store()
- def reload(self):
- """
- Reload and re-apply configuration settings
-
- Existing child processes are sent a SIGHUP signal
- and will exit after completing existing requests.
- New child processes, which will have the updated
- configuration, are spawned. This allows preventing
- interruption to the service.
- """
- def _has_changed(old, new, param):
- old = old.get(param)
- new = getattr(new, param)
- return (new != old)
-
- old_conf = utils.stash_conf_values()
- has_changed = functools.partial(_has_changed, old_conf, CONF)
- CONF.reload_config_files()
- os.killpg(self.pgid, signal.SIGHUP)
- self.stale_children = self.children
- self.children = set()
-
- # Ensure any logging config changes are picked up
- logging.setup(CONF, 'glance')
- config.set_config_defaults()
-
- self.configure(old_conf, has_changed)
- self.start_wsgi()
-
def wait(self):
"""Wait until all servers have completed running."""
try:
@@ -661,34 +598,6 @@ class Server(object):
except KeyboardInterrupt:
pass
- def run_child(self):
- def child_hup(*args):
- """Shuts down child processes, existing requests are handled."""
- self.set_signal_handler("SIGHUP", signal.SIG_IGN)
- eventlet.wsgi.is_accepting = False
- self.sock.close()
-
- pid = os.fork()
- if pid == 0:
- self.set_signal_handler("SIGHUP", child_hup)
- self.set_signal_handler("SIGTERM", signal.SIG_DFL)
- # ignore the interrupt signal to avoid a race whereby
- # a child worker receives the signal before the parent
- # and is respawned unnecessarily as a result
- self.set_signal_handler("SIGINT", signal.SIG_IGN)
- # The child has no need to stash the unwrapped
- # socket, and the reference prevents a clean
- # exit on sighup
- self._sock = None
- self.run_server()
- LOG.info(_LI('Child %d exiting normally'), os.getpid())
- # self.pool.waitall() is now called in wsgi's server so
- # it's safe to exit here
- sys.exit(0)
- else:
- LOG.info(_LI('Started child %s'), pid)
- self.children.add(pid)
-
def run_server(self):
"""Run a WSGI server."""
if cfg.CONF.pydev_worker_debug_host:
@@ -796,6 +705,262 @@ class Server(object):
self.sock.listen(CONF.backlog)
+class PosixServer(BaseServer):
+ def __init__(self, *args, **kwargs):
+ super(PosixServer, self).__init__(*args, **kwargs)
+
+ self.pgid = os.getpid()
+ try:
+ # NOTE(flaper87): Make sure this process
+ # runs in its own process group.
+ os.setpgid(self.pgid, self.pgid)
+ except OSError:
+ # NOTE(flaper87): When running glance-control,
+ # (glance's functional tests, for example)
+ # setpgid fails with EPERM as glance-control
+ # creates a fresh session, of which the newly
+ # launched service becomes the leader (session
+ # leaders may not change process groups)
+ #
+ # Running glance-(api|registry) is safe and
+ # shouldn't raise any error here.
+ self.pgid = 0
+
+ def kill_children(self, *args):
+ """Kills the entire process group."""
+ self.set_signal_handler("SIGTERM", signal.SIG_IGN)
+ self.set_signal_handler("SIGINT", signal.SIG_IGN)
+ self.set_signal_handler("SIGCHLD", signal.SIG_IGN)
+ self.running = False
+ os.killpg(self.pgid, signal.SIGTERM)
+
+ def _remove_children(self, pid):
+ if pid in self.children:
+ self.children.remove(pid)
+ LOG.info(_LI('Removed dead child %s'), pid)
+ elif pid in self.stale_children:
+ self.stale_children.remove(pid)
+ LOG.info(_LI('Removed stale child %s'), pid)
+ else:
+ LOG.warn(_LW('Unrecognised child %s') % pid)
+
+ def _verify_and_respawn_children(self, pid, status):
+ if len(self.stale_children) == 0:
+ LOG.debug('No stale children')
+ if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0:
+ LOG.error(_LE('Not respawning child %d, cannot '
+ 'recover from termination') % pid)
+ if not self.children and not self.stale_children:
+ LOG.info(
+ _LI('All workers have terminated. Exiting'))
+ self.running = False
+ else:
+ if len(self.children) < get_num_workers():
+ self.run_child()
+
+ def wait_on_children(self):
+ while self.running:
+ try:
+ pid, status = os.wait()
+ if os.WIFEXITED(status) or os.WIFSIGNALED(status):
+ self._remove_children(pid)
+ self._verify_and_respawn_children(pid, status)
+ except OSError as err:
+ if err.errno not in (errno.EINTR, errno.ECHILD):
+ raise
+ except KeyboardInterrupt:
+ LOG.info(_LI('Caught keyboard interrupt. Exiting.'))
+ break
+ except exception.SIGHUPInterrupt:
+ self.reload()
+ continue
+ eventlet.greenio.shutdown_safe(self.sock)
+ self.sock.close()
+ LOG.debug('Exited')
+
+ def run_child(self):
+ def child_hup(*args):
+ """Shuts down child processes, existing requests are handled."""
+ self.set_signal_handler("SIGHUP", signal.SIG_IGN)
+ eventlet.wsgi.is_accepting = False
+ self.sock.close()
+
+ pid = os.fork()
+ if pid == 0:
+ self.set_signal_handler("SIGHUP", child_hup)
+ self.set_signal_handler("SIGTERM", signal.SIG_DFL)
+ # ignore the interrupt signal to avoid a race whereby
+ # a child worker receives the signal before the parent
+ # and is respawned unnecessarily as a result
+ self.set_signal_handler("SIGINT", signal.SIG_IGN)
+ # The child has no need to stash the unwrapped
+ # socket, and the reference prevents a clean
+ # exit on sighup
+ self._sock = None
+ self.run_server()
+ LOG.info(_LI('Child %d exiting normally'), os.getpid())
+ # self.pool.waitall() is now called in wsgi's server so
+ # it's safe to exit here
+ sys.exit(0)
+ else:
+ LOG.info(_LI('Started child %s'), pid)
+ self.children.add(pid)
+
+ def reload(self):
+ """
+ Reload and re-apply configuration settings
+
+ Existing child processes are sent a SIGHUP signal
+ and will exit after completing existing requests.
+ New child processes, which will have the updated
+ configuration, are spawned. This allows preventing
+ interruption to the service.
+ """
+ def _has_changed(old, new, param):
+ old = old.get(param)
+ new = getattr(new, param)
+ return (new != old)
+
+ old_conf = utils.stash_conf_values()
+ has_changed = functools.partial(_has_changed, old_conf, CONF)
+ CONF.reload_config_files()
+ os.killpg(self.pgid, signal.SIGHUP)
+ self.stale_children = self.children
+ self.children = set()
+
+ # Ensure any logging config changes are picked up
+ logging.setup(CONF, 'glance')
+ config.set_config_defaults()
+
+ self.configure(old_conf, has_changed)
+ self.start_wsgi()
+
+
+class Win32ProcessLauncher(object):
+ def __init__(self):
+ self._processutils = os_win_utilsfactory.get_processutils()
+
+ self._workers = []
+ self._worker_job_handles = []
+
+ def add_process(self, cmd):
+ LOG.info("Starting subprocess: %s", cmd)
+
+ worker = subprocess.Popen(cmd, close_fds=False)
+ try:
+ job_handle = self._processutils.kill_process_on_job_close(
+ worker.pid)
+ except Exception:
+ LOG.exception("Could not associate child process "
+ "with a job, killing it.")
+ worker.kill()
+ raise
+
+ self._worker_job_handles.append(job_handle)
+ self._workers.append(worker)
+
+ return worker
+
+ def wait(self):
+ pids = [worker.pid for worker in self._workers]
+ if pids:
+ self._processutils.wait_for_multiple_processes(pids,
+ wait_all=True)
+ # By sleeping here, we allow signal handlers to be executed.
+ time.sleep(0)
+
+
+class Win32Server(BaseServer):
+ _py_script_re = re.compile(r'.*\.py\w?$')
+ _sock = None
+
+ def __init__(self, *args, **kwargs):
+ super(Win32Server, self).__init__(*args, **kwargs)
+ self._launcher = Win32ProcessLauncher()
+ self._ioutils = os_win_utilsfactory.get_ioutils()
+
+ def run_child(self):
+ # We're passing copies of the socket through pipes.
+ rfd, wfd = self._ioutils.create_pipe(inherit_handle=True)
+
+ cmd = sys.argv + ['--pipe-handle=%s' % int(rfd)]
+ # Recent setuptools versions will trim '-script.py' and '.exe'
+ # extensions from sys.argv[0].
+ if self._py_script_re.match(sys.argv[0]):
+ cmd = [sys.executable] + cmd
+
+ worker = self._launcher.add_process(cmd)
+ self._ioutils.close_handle(rfd)
+
+ share_sock_buff = self._sock.share(worker.pid)
+ self._ioutils.write_file(
+ wfd,
+ struct.pack('<I', len(share_sock_buff)),
+ 4)
+ self._ioutils.write_file(
+ wfd, share_sock_buff, len(share_sock_buff))
+
+ self.children.add(worker.pid)
+
+ def kill_children(self, *args):
+ # We're using job objects, the children will exit along with the
+ # main process.
+ exit(0)
+
+ def wait_on_children(self):
+ self._launcher.wait()
+
+ def _get_sock_from_parent(self):
+ # This is supposed to be called exactly once in the child process.
+ # We're passing a copy of the socket through a pipe.
+ pipe_handle = int(getattr(CONF, 'pipe_handle', 0))
+ if not pipe_handle:
+ err_msg = _("Did not receive a pipe handle, which is used when "
+ "communicating with the parent process.")
+ raise exception.GlanceException(err_msg)
+
+ # Get the length of the data to be received.
+ buff = self._ioutils.get_buffer(4)
+ self._ioutils.read_file(pipe_handle, buff, 4)
+ socket_buff_sz = struct.unpack('<I', buff)[0]
+
+ # Get the serialized socket object.
+ socket_buff = self._ioutils.get_buffer(socket_buff_sz)
+ self._ioutils.read_file(pipe_handle, socket_buff, socket_buff_sz)
+ self._ioutils.close_handle(pipe_handle)
+
+ # Recreate the socket object. This will only work with
+ # Python 3.6 or later.
+ return socket.fromshare(bytes(socket_buff[:]))
+
+ def configure_socket(self, old_conf=None, has_changed=None):
+ fresh_start = not (old_conf or has_changed)
+ use_ssl = CONF.cert_file or CONF.key_file
+ pipe_handle = getattr(CONF, 'pipe_handle', None)
+
+ if not (fresh_start and pipe_handle):
+ return super(Win32Server, self).configure_socket(
+ old_conf, has_changed)
+
+ self._sock = self._get_sock_from_parent()
+
+ if use_ssl:
+ self.sock = ssl_wrap_socket(self._sock)
+ else:
+ self.sock = self._sock
+
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ # This was introduced in WS 2016 RS3
+ self.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
+ CONF.tcp_keepidle)
+
+
+if os.name == 'nt':
+ Server = Win32Server
+else:
+ Server = PosixServer
+
+
class Middleware(object):
"""
Base WSGI middleware wrapper. These classes require an application to be