summaryrefslogtreecommitdiff
path: root/zuul/webapp.py
blob: b5fdc0ee547776d0d33b5d97a473f777110231c9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import copy
import json
import logging
import re
import threading
import time
from paste import httpserver
import webob
from webob import dec

from zuul.lib import encryption

"""Zuul main web app.

Zuul supports HTTP requests directly against it for determining the
change status. These responses are provided as json data structures.

The supported urls are:

 - /status: return a complex data structure that represents the entire
   queue / pipeline structure of the system
 - /status.json (backwards compatibility): same as /status
 - /status/change/X,Y: return status just for gerrit change X,Y
 - /keys/SOURCE/PROJECT.pub: return the public key for PROJECT

When returning status for a single gerrit change you will get an
array of changes, they will not include the queue structure.
"""


class WebApp(threading.Thread):
    log = logging.getLogger("zuul.WebApp")
    change_path_regexp = '/status/change/(.*)$'

    def __init__(self, scheduler, port=8001, cache_expiry=1,
                 listen_address='0.0.0.0'):
        threading.Thread.__init__(self)
        self.scheduler = scheduler
        self.listen_address = listen_address
        self.port = port
        self.cache_expiry = cache_expiry
        self.cache_time = 0
        self.cache = {}
        self.daemon = True
        self.routes = {}
        self._init_default_routes()
        self.server = httpserver.serve(
            dec.wsgify(self.app), host=self.listen_address, port=self.port,
            start_loop=False)

    def _init_default_routes(self):
        self.register_path('/(status\.json|status)$', self.status)
        self.register_path(self.change_path_regexp, self.change)

    def run(self):
        self.server.serve_forever()

    def stop(self):
        self.server.server_close()

    def _changes_by_func(self, func, tenant_name):
        """Filter changes by a user provided function.

        In order to support arbitrary collection of subsets of changes
        we provide a low level filtering mechanism that takes a
        function which applies to changes. The output of this function
        is a flattened list of those collected changes.
        """
        status = []
        jsonstruct = json.loads(self.cache[tenant_name])
        for pipeline in jsonstruct['pipelines']:
            for change_queue in pipeline['change_queues']:
                for head in change_queue['heads']:
                    for change in head:
                        if func(change):
                            status.append(copy.deepcopy(change))
        return json.dumps(status)

    def _status_for_change(self, rev, tenant_name):
        """Return the statuses for a particular change id X,Y."""
        def func(change):
            return change['id'] == rev
        return self._changes_by_func(func, tenant_name)

    def register_path(self, path, handler):
        path_re = re.compile(path)
        self.routes[path] = (path_re, handler)

    def unregister_path(self, path):
        if self.routes.get(path):
            del self.routes[path]

    def _handle_keys(self, request, path):
        m = re.match('/keys/(.*?)/(.*?).pub', path)
        if not m:
            raise webob.exc.HTTPBadRequest()
        source_name = m.group(1)
        project_name = m.group(2)
        source = self.scheduler.connections.getSource(source_name)
        if not source:
            raise webob.exc.HTTPNotFound(
                detail="Cannot locate a source named %s" % source_name)
        project = source.getProject(project_name)
        if not project or not hasattr(project, 'public_key'):
            raise webob.exc.HTTPNotFound(
                detail="Cannot locate a project named %s" % project_name)

        pem_public_key = encryption.serialize_rsa_public_key(
            project.public_key)

        response = webob.Response(body=pem_public_key,
                                  content_type='text/plain')
        return response.conditional_response_app

    def app(self, request):
        # Try registered paths without a tenant_name first
        path = request.path
        for path_re, handler in self.routes.values():
            if path_re.match(path):
                return handler(path, '', request)

        # Now try with a tenant_name stripped
        x, tenant_name, path = request.path.split('/', 2)
        path = '/' + path
        # Handle keys
        if path.startswith('/keys'):
            try:
                return self._handle_keys(request, path)
            except Exception as e:
                self.log.exception("Issue with _handle_keys")
                raise
        for path_re, handler in self.routes.values():
            if path_re.match(path):
                return handler(path, tenant_name, request)
        else:
            raise webob.exc.HTTPNotFound()

    def status(self, path, tenant_name, request):
        def func():
            return webob.Response(body=self.cache[tenant_name],
                                  content_type='application/json',
                                  charset='utf8')
        if tenant_name not in self.scheduler.abide.tenants:
            raise webob.exc.HTTPNotFound()
        return self._response_with_status_cache(func, tenant_name)

    def change(self, path, tenant_name, request):
        def func():
            m = re.match(self.change_path_regexp, path)
            change_id = m.group(1)
            status = self._status_for_change(change_id, tenant_name)
            if status:
                return webob.Response(body=status,
                                      content_type='application/json',
                                      charset='utf8')
            else:
                raise webob.exc.HTTPNotFound()
        return self._response_with_status_cache(func, tenant_name)

    def _refresh_status_cache(self, tenant_name):
        if (tenant_name not in self.cache or
            (time.time() - self.cache_time) > self.cache_expiry):
            try:
                self.cache[tenant_name] = self.scheduler.formatStatusJSON(
                    tenant_name)
                # Call time.time() again because formatting above may take
                # longer than the cache timeout.
                self.cache_time = time.time()
            except Exception:
                self.log.exception("Exception formatting status:")
                raise

    def _response_with_status_cache(self, func, tenant_name):
        self._refresh_status_cache(tenant_name)

        response = func()

        response.headers['Access-Control-Allow-Origin'] = '*'

        response.cache_control.public = True
        response.cache_control.max_age = self.cache_expiry
        response.last_modified = self.cache_time
        response.expires = self.cache_time + self.cache_expiry

        return response.conditional_response_app