summaryrefslogtreecommitdiff
path: root/chromium/build/fuchsia/run_package.py
blob: ed2cca3bf12105002e31b5cf7ef2ec86dbfabae0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# Copyright 2018 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Contains a helper function for deploying and executing a packaged
executable on a Target."""

from __future__ import print_function

import common
import hashlib
import logging
import multiprocessing
import os
import re
import select
import subprocess
import sys
import threading
import uuid

from symbolizer import BuildIdsPaths, RunSymbolizer, SymbolizerFilter

FAR = common.GetHostToolPathFromPlatform('far')

# Amount of time to wait for the termination of the system log output thread.
_JOIN_TIMEOUT_SECS = 5


def _AttachKernelLogReader(target):
  """Attaches a kernel log reader as a long-running SSH task."""

  logging.info('Attaching kernel logger.')
  return target.RunCommandPiped(['dlog', '-f'], stdin=open(os.devnull, 'r'),
                                stdout=subprocess.PIPE,
                                stderr=subprocess.STDOUT)


class SystemLogReader(object):
  """Collects and symbolizes Fuchsia system log to a file."""

  def __init__(self):
    self._listener_proc = None
    self._symbolizer_proc = None
    self._system_log = None

  def __enter__(self):
      return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    """Stops the system logging processes and closes the output file."""
    if self._symbolizer_proc:
      self._symbolizer_proc.kill()
    if self._listener_proc:
      self._listener_proc.kill()
    if self._system_log:
      self._system_log.close()

  def Start(self, target, package_paths, system_log_file):
    """Start a system log reader as a long-running SSH task."""
    logging.debug('Writing fuchsia system log to %s' % system_log_file)

    self._listener_proc = target.RunCommandPiped(['log_listener'],
                                                  stdout=subprocess.PIPE,
                                                  stderr=subprocess.STDOUT)

    self._system_log = open(system_log_file,'w', buffering=1)
    self._symbolizer_proc = RunSymbolizer(self._listener_proc.stdout,
                                          self._system_log,
                                          BuildIdsPaths(package_paths))


class MergedInputStream(object):
  """Merges a number of input streams into a UNIX pipe on a dedicated thread.
  Terminates when the file descriptor of the primary stream (the first in
  the sequence) is closed."""

  def __init__(self, streams):
    assert len(streams) > 0
    self._streams = streams
    self._output_stream = None
    self._thread = None

  def Start(self):
    """Returns a pipe to the merged output stream."""

    read_pipe, write_pipe = os.pipe()

    # Disable buffering for the stream to make sure there is no delay in logs.
    self._output_stream = os.fdopen(write_pipe, 'w', 0)
    self._thread = threading.Thread(target=self._Run)
    self._thread.start();

    return os.fdopen(read_pipe, 'r')

  def _Run(self):
    streams_by_fd = {}
    primary_fd = self._streams[0].fileno()
    for s in self._streams:
      streams_by_fd[s.fileno()] = s

    # Set when the primary FD is closed. Input from other FDs will continue to
    # be processed until select() runs dry.
    flush = False

    # The lifetime of the MergedInputStream is bound to the lifetime of
    # |primary_fd|.
    while primary_fd:
      # When not flushing: block until data is read or an exception occurs.
      rlist, _, xlist = select.select(streams_by_fd, [], streams_by_fd)

      if len(rlist) == 0 and flush:
        break

      for fileno in xlist:
        del streams_by_fd[fileno]
        if fileno == primary_fd:
          primary_fd = None

      for fileno in rlist:
        line = streams_by_fd[fileno].readline()
        if line:
          self._output_stream.write(line + '\n')
        else:
          del streams_by_fd[fileno]
          if fileno == primary_fd:
            primary_fd = None

    # Flush the streams by executing nonblocking reads from the input file
    # descriptors until no more data is available,  or all the streams are
    # closed.
    while streams_by_fd:
      rlist, _, _ = select.select(streams_by_fd, [], [], 0)

      if not rlist:
        break

      for fileno in rlist:
        line = streams_by_fd[fileno].readline()
        if line:
          self._output_stream.write(line + '\n')
        else:
          del streams_by_fd[fileno]


def _GetComponentUri(package_name):
  return 'fuchsia-pkg://fuchsia.com/%s#meta/%s.cmx' % (package_name,
                                                       package_name)


class RunPackageArgs:
  """RunPackage() configuration arguments structure.

  symbolizer_config: A newline delimited list of source files contained
      in the package. Omitting this parameter will disable symbolization.
  system_logging: If set, connects a system log reader to the target.
  """
  def __init__(self):
    self.symbolizer_config = None
    self.system_logging = False

  @staticmethod
  def FromCommonArgs(args):
    run_package_args = RunPackageArgs()
    run_package_args.system_logging = args.include_system_logs
    return run_package_args


def _DrainStreamToStdout(stream, quit_event):
  """Outputs the contents of |stream| until |quit_event| is set."""

  while not quit_event.is_set():
    rlist, _, _ = select.select([ stream ], [], [], 0.1)
    if rlist:
      line = rlist[0].readline()
      if not line:
        return
      print(line.rstrip())


def RunPackage(output_dir, target, package_paths, package_name,
               package_args, args):
  """Installs the Fuchsia package at |package_path| on the target,
  executes it with |package_args|, and symbolizes its output.

  output_dir: The path containing the build output files.
  target: The deployment Target object that will run the package.
  package_paths: The paths to the .far packages to be installed.
  package_name: The name of the primary package to run.
  package_args: The arguments which will be passed to the Fuchsia process.
  args: Structure of arguments to configure how the package will be run.

  Returns the exit code of the remote package process."""

  system_logger = (
      _AttachKernelLogReader(target) if args.system_logging else None)
  try:
    if system_logger:
      # Spin up a thread to asynchronously dump the system log to stdout
      # for easier diagnoses of early, pre-execution failures.
      log_output_quit_event = multiprocessing.Event()
      log_output_thread = threading.Thread(
          target=
          lambda: _DrainStreamToStdout(system_logger.stdout, log_output_quit_event)
      )
      log_output_thread.daemon = True
      log_output_thread.start()

    with target.GetAmberRepo():
      target.InstallPackage(package_paths)

      if system_logger:
        log_output_quit_event.set()
        log_output_thread.join(timeout=_JOIN_TIMEOUT_SECS)

      logging.info('Running application.')
      command = ['run', _GetComponentUri(package_name)] + package_args
      process = target.RunCommandPiped(
          command,
          stdin=open(os.devnull, 'r'),
          stdout=subprocess.PIPE,
          stderr=subprocess.STDOUT)

      if system_logger:
        output_stream = MergedInputStream(
            [process.stdout, system_logger.stdout]).Start()
      else:
        output_stream = process.stdout

      # Run the log data through the symbolizer process.
      output_stream = SymbolizerFilter(output_stream,
                                       BuildIdsPaths(package_paths))

      for next_line in output_stream:
        print(next_line.rstrip())

      process.wait()
      if process.returncode == 0:
        logging.info('Process exited normally with status code 0.')
      else:
        # The test runner returns an error status code if *any* tests fail,
        # so we should proceed anyway.
        logging.warning(
            'Process exited with status code %d.' % process.returncode)

  finally:
    if system_logger:
      logging.info('Terminating kernel log reader.')
      log_output_quit_event.set()
      log_output_thread.join()
      system_logger.kill()

  return process.returncode