summaryrefslogtreecommitdiff
path: root/zuul/lib/log_streamer.py
blob: f96f4427949454887a69d19974fb841531d42edb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python

# Copyright (c) 2016 IBM Corp.
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import os
import os.path
import re
import select
import threading
import time

from zuul.lib import streamer_utils


class Log(object):

    def __init__(self, path):
        self.path = path
        # The logs are written as binary encoded utf-8, which is what we
        # send over the wire.
        self.file = open(path, 'rb')
        self.stat = os.stat(path)
        self.size = self.stat.st_size


class RequestHandler(streamer_utils.BaseFingerRequestHandler):
    '''
    Class to handle a single log streaming request.

    The log streaming code was blatantly stolen from zuul_console.py. Only
    the (class/method/attribute) names were changed to protect the innocent.
    '''

    log = logging.getLogger("zuul.log_streamer")

    def handle(self):
        try:
            build_uuid = self.getCommand()
        except Exception:
            self.log.exception("Failure during getCommand:")
            msg = 'Internal streaming error'
            self.request.sendall(msg.encode("utf-8"))
            return

        # validate build ID
        if not re.match("[0-9A-Fa-f]+$", build_uuid):
            msg = 'Build ID %s is not valid' % build_uuid
            self.request.sendall(msg.encode("utf-8"))
            return

        job_dir = os.path.join(self.server.jobdir_root, build_uuid)
        if not os.path.exists(job_dir):
            msg = 'Build ID %s not found' % build_uuid
            self.request.sendall(msg.encode("utf-8"))
            return

        # check if log file exists
        log_file = os.path.join(job_dir, 'work', 'logs', 'job-output.txt')
        if not os.path.exists(log_file):
            msg = 'Log not found for build ID %s' % build_uuid
            self.request.sendall(msg.encode("utf-8"))
            return

        try:
            self.stream_log(log_file)
        except Exception:
            self.log.exception("Streaming failure for build UUID %s:",
                               build_uuid)
            msg = 'Internal streaming error'
            self.request.sendall(msg.encode("utf-8"))

    def stream_log(self, log_file):
        log = None
        while True:
            if log is not None:
                try:
                    log.file.close()
                except Exception:
                    pass
            while True:
                log = self.chunk_log(log_file)
                if log:
                    break
                time.sleep(0.5)
            while True:
                if self.follow_log(log):
                    break
                else:
                    return

    def chunk_log(self, log_file):
        try:
            log = Log(log_file)
        except Exception:
            return
        while True:
            chunk = log.file.read(4096)
            if not chunk:
                break
            self.request.send(chunk)
        return log

    def follow_log(self, log):
        while True:
            # As long as we have unread data, keep reading/sending
            while True:
                chunk = log.file.read(4096)
                if chunk:
                    self.request.send(chunk)
                else:
                    break

            # See if the file has been removed, meaning we should stop
            # streaming it.
            if not os.path.exists(log.path):
                return False

            # At this point, we are waiting for more data to be written
            time.sleep(0.5)

            # Check to see if the remote end has sent any data, if so,
            # discard
            r, w, e = select.select([self.request], [], [self.request], 0)
            if self.request in e:
                return False
            if self.request in r:
                ret = self.request.recv(1024)
                # Discard anything read, if input is eof, it has
                # disconnected.
                if not ret:
                    return False


class LogStreamerServer(streamer_utils.CustomThreadingTCPServer):

    def __init__(self, *args, **kwargs):
        self.jobdir_root = kwargs.pop('jobdir_root')
        super(LogStreamerServer, self).__init__(*args, **kwargs)


class LogStreamer(object):
    '''
    Class implementing log streaming over the finger daemon port.
    '''

    def __init__(self, host, port, jobdir_root):
        self.log = logging.getLogger('zuul.log_streamer')
        self.log.debug("LogStreamer starting on port %s", port)
        self.server = LogStreamerServer((host, port),
                                        RequestHandler,
                                        jobdir_root=jobdir_root)

        # We start the actual serving within a thread so we can return to
        # the owner.
        self.thd = threading.Thread(target=self._run)
        self.thd.daemon = True
        self.thd.start()

    def _run(self):
        try:
            self.server.serve_forever()
        except Exception:
            self.log.exception("Abnormal termination:")
            raise

    def stop(self):
        if self.thd.isAlive():
            self.server.shutdown()
            self.server.server_close()
            self.thd.join()
            self.log.debug("LogStreamer stopped")