summaryrefslogtreecommitdiff
path: root/coverage/data.py
blob: 704a3481436f3e14decdbfd072b3e4933dc9c05a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
"""Coverage data for Coverage."""

import glob
import json
import os
import random
import socket

from coverage import env
from coverage.backward import iitems, string_class
from coverage.debug import _TEST_NAME_FILE, pretty_data
from coverage.files import PathAliases
from coverage.misc import CoverageException, file_be_gone


class CoverageData(object):
    """Manages collected coverage data, including file storage.

    This class is the public supported API to coverage.py's data.

    .. note::

        The file format is not documented or guaranteed.  It will change in
        the future, in possibly complicated ways.  Use this API to avoid
        disruption.

    There are three kinds of data that can be collected:

    * **lines**: the line numbers of source lines that were executed.
      These are always available.

    * **arcs**: pairs of source and destination line numbers for transitions
      between source lines.  These are only available if branch coverage was
      used.

    * **plugin names**: the module names of the plugin that handled each file
      in the data.


    To read a coverage.py data file, use :meth:`read_file`, or :meth:`read` if
    you have an already-opened file.  You can then access the line, arc, or
    plugin data with :meth:`lines`, :meth:`arcs`, or :meth:`plugin_name`.

    The :meth:`has_arcs` method indicates whether arc data is available.  You
    can get a list of the files in the data with :meth:`measured_files`.
    A summary of the line data is available from :meth:`line_counts`.  As with
    most Python containers, you can determine if there is any data at all by
    using this object as a boolean value.


    Most data files will be created by coverage.py itself, but you can use
    methods here to create data files if you like.  The :meth:`set_lines`,
    :meth:`set_arcs`, and :meth:`set_plugins` methods add data, in ways that
    are convenient for coverage.py.  To add a file without any measured data,
    use :meth:`touch_file`.

    You write to a named file with :meth:`write_file`, or to an already opened
    file with :meth:`write`.

    You can clear the data in memory with :meth:`erase`.  Two data collections
    can be combined by using :meth:`update` on one `CoverageData`, passing it
    the other.

    """

    # The data file format is JSON, with these keys:
    #
    #     * lines: a dict mapping filenames to lists of line numbers
    #       executed::
    #
    #         { 'file1': [17,23,45], 'file2': [1,2,3], ... }
    #
    #     * arcs: a dict mapping filenames to lists of line number pairs::
    #
    #         { 'file1': [[17,23], [17,25], [25,26]], ... }
    #
    #     * plugins: a dict mapping filenames to plugin names::
    #
    #         { 'file1': "django.coverage", ... }
    #
    # Only one of `lines` or `arcs` will be present: with branch coverage, data
    # is stored as arcs. Without branch coverage, it is stored as lines.  The
    # line data is easily recovered from the arcs: it is all the first elements
    # of the pairs that are greater than zero.

    def __init__(self, debug=None):
        """Create a CoverageData.

        `debug` is a `DebugControl` object for writing debug messages.

        """
        self._debug = debug

        # A map from canonical Python source file name to a dictionary in
        # which there's an entry for each line number that has been
        # executed:
        #
        #   { 'filename1.py': [12, 47, 1001], ... }
        #
        self._lines = {}

        # A map from canonical Python source file name to a dictionary with an
        # entry for each pair of line numbers forming an arc:
        #
        #   { 'filename1.py': [(12,14), (47,48), ... ], ... }
        #
        self._arcs = {}

        # A map from canonical source file name to a plugin module name:
        #
        #   { 'filename1.py': 'django.coverage', ... }
        #
        self._plugins = {}

    ##
    ## Reading data
    ##

    def has_arcs(self):
        """Does this data have arcs?

        Arc data is only available if branch coverage was used during
        collection.

        Returns a boolean.

        """
        return self._has_arcs()

    def lines(self, filename):
        """Get the list of lines executed for a file.

        If the file was not measured, returns None.  A file might be measured,
        and have no lines executed, in which case an empty list is returned.

        """
        if self._arcs:
            if filename in self._arcs:
                return [s for s, __ in self._arcs[filename] if s > 0]
        else:
            if filename in self._lines:
                return self._lines[filename]
        return None

    def arcs(self, filename):
        """Get the list of arcs executed for a file.

        If the file was not measured, returns None.  A file might be measured,
        and have no arcs executed, in which case an empty list is returned.

        """
        if filename in self._arcs:
            return self._arcs[filename]
        return None

    def plugin_name(self, filename):
        """Get the plugin name for a file.

        Arguments:
            filename: the name of the file you're interested in.

        Returns:
            str: the name of the plugin that handles this file.  If the file
                was measured, but didn't use a plugin, then "" is returned.
                If the file was not measured, then None is returned.

        """
        # Because the vast majority of files involve no plugin, we don't store
        # them explicitly in self._plugins.  Check the measured data instead
        # to see if it was a known file with no plugin.
        if filename in (self._arcs or self._lines):
            return self._plugins.get(filename, "")
        return None

    def measured_files(self):
        """A list of all files that had been measured."""
        return list(self._arcs or self._lines)

    def line_counts(self, fullpath=False):
        """Return a dict summarizing the line coverage data.

        Keys are based on the filenames, and values are the number of executed
        lines.  If `fullpath` is true, then the keys are the full pathnames of
        the files, otherwise they are the basenames of the files.

        Returns:
            dict mapping filenames to counts of lines.

        """
        summ = {}
        if fullpath:
            filename_fn = lambda f: f
        else:
            filename_fn = os.path.basename
        for filename in self.measured_files():
            summ[filename_fn(filename)] = len(self.lines(filename))
        return summ

    def __nonzero__(self):
        return bool(self._lines) or bool(self._arcs)

    __bool__ = __nonzero__

    def read(self, file_obj):
        """Read the coverage data from the given file object.

        Should only be used on an empty CoverageData object.

        """
        data = self._read_raw_data(file_obj)

        self._lines = data.get('lines', {})
        self._arcs = dict(
            (fname, [tuple(pair) for pair in arcs])
            for fname, arcs in iitems(data.get('arcs', {}))
        )
        self._plugins = data.get('plugins', {})

        self._validate()

    def read_file(self, filename):
        """Read the coverage data from `filename` into this object."""
        if self._debug and self._debug.should('dataio'):
            self._debug.write("Reading data from %r" % (filename,))
        try:
            with self._open_for_reading(filename) as f:
                self.read(f)
        except Exception as exc:
            raise CoverageException(
                "Couldn't read data from '%s': %s: %s" % (
                    filename, exc.__class__.__name__, exc,
                )
            )

    GO_AWAY = "!coverage.py: This is a private format, don't read it directly!"

    @classmethod
    def _open_for_reading(cls, filename):
        """Open a file appropriately for reading data."""
        f = open(filename, "r")
        try:
            go_away = f.read(len(cls.GO_AWAY))
            if go_away != cls.GO_AWAY:
                raise CoverageException("Doesn't seem to be a coverage.py data file")
        except Exception:
            f.close()
            raise
        return f

    @classmethod
    def _read_raw_data(cls, file_obj):
        """Read the raw data from a file object."""
        return json.load(file_obj)

    @classmethod
    def _read_raw_data_file(cls, filename):
        """Read the raw data from a file, for debugging."""
        with cls._open_for_reading(filename) as f:
            return cls._read_raw_data(f)

    ##
    ## Writing data
    ##

    def set_lines(self, line_data):
        """Add executed line data.

        `line_data` is a dictionary mapping filenames to dictionaries::

            { filename: { lineno: None, ... }, ...}

        Do not call this more than once, it will not update data, it only sets
        data.

        """
        if self._has_arcs():
            raise CoverageException("Can't add lines to existing arc data")

        for filename, linenos in iitems(line_data):
            self._lines[filename] = list(linenos)

        self._validate()

    def set_arcs(self, arc_data):
        """Add measured arc data.

        `arc_data` is { filename: { (l1,l2): None, ... }, ...}

        Do not call this more than once, it will not update data, it only sets
        data.

        """
        if self._has_lines():
            raise CoverageException("Can't add arcs to existing line data")

        for filename, arcs in iitems(arc_data):
            self._arcs[filename] = list(arcs)

        self._validate()

    def set_plugins(self, plugin_data):
        """Add per-file plugin information.

        `plugin_data` is { filename: plugin_name, ... }

        """
        existing_files = self._arcs or self._lines
        for filename, plugin_name in iitems(plugin_data):
            if filename not in existing_files:
                raise CoverageException(
                    "Can't add plugin data for unmeasured file '%s'" % (filename,)
                )
            existing_plugin = self._plugins.get(filename)
            if existing_plugin is not None and plugin_name != existing_plugin:
                raise CoverageException(
                    "Conflicting plugin name for '%s': %r vs %r" % (
                        filename, existing_plugin, plugin_name,
                    )
                )
            self._plugins[filename] = plugin_name

        self._validate()

    def touch_file(self, filename):
        """Ensure that `filename` appears in the data, empty if needed."""
        (self._arcs or self._lines).setdefault(filename, [])
        self._validate()

    def write(self, file_obj):
        """Write the coverage data to `file_obj`."""

        # Create the file data.
        file_data = {}

        if self._arcs:
            file_data['arcs'] = self._arcs
        else:
            file_data['lines'] = self._lines

        if self._plugins:
            file_data['plugins'] = self._plugins

        # Write the data to the file.
        file_obj.write(self.GO_AWAY)
        json.dump(file_data, file_obj)

    def write_file(self, filename):
        """Write the coverage data to `filename`."""
        if self._debug and self._debug.should('dataio'):
            self._debug.write("Writing data to %r" % (filename,))
        with open(filename, 'w') as fdata:
            self.write(fdata)

    def erase(self):
        """Erase the data in this object."""
        self._lines = {}
        self._arcs = {}
        self._plugins = {}
        self._validate()

    def update(self, other_data, aliases=None):
        """Update this data with data from another `CoverageData`.

        If `aliases` is provided, it's a `PathAliases` object that is used to
        re-map paths to match the local machine's.

        """
        if self._has_lines() and other_data._has_arcs():
            raise CoverageException("Can't combine arc data with line data")
        if self._has_arcs() and other_data._has_lines():
            raise CoverageException("Can't combine line data with arc data")

        aliases = aliases or PathAliases()

        # _plugins: only have a string, so they have to agree.
        # Have to do these first, so that our examination of self._arcs and
        # self._lines won't be confused by data updated from other_data.
        for filename in other_data.measured_files():
            other_plugin = other_data.plugin_name(filename)
            filename = aliases.map(filename)
            this_plugin = self.plugin_name(filename)
            if this_plugin is None:
                if other_plugin:
                    self._plugins[filename] = other_plugin
            elif this_plugin != other_plugin:
                raise CoverageException(
                    "Conflicting plugin name for '%s': %r vs %r" % (
                        filename, this_plugin, other_plugin,
                    )
                )

        # _lines: merge dicts.
        for filename, file_lines in iitems(other_data._lines):
            filename = aliases.map(filename)
            if filename in self._lines:
                lines = set(self._lines[filename])
                lines.update(file_lines)
                file_lines = list(lines)
            self._lines[filename] = file_lines

        # _arcs: merge dicts.
        for filename, file_arcs in iitems(other_data._arcs):
            filename = aliases.map(filename)
            if filename in self._arcs:
                arcs = set(self._arcs[filename])
                arcs.update(file_arcs)
                file_arcs = list(arcs)
            self._arcs[filename] = file_arcs

        self._validate()

    ##
    ## Miscellaneous
    ##

    def _validate(self):
        """If we are in paranoid mode, validate that everything is right."""
        if env.TESTING:
            self._validate_invariants()

    def _validate_invariants(self):
        """Validate internal invariants."""
        # Only one of _lines or _arcs should exist.
        assert not(self._has_lines() and self._has_arcs()), (
            "Shouldn't have both _lines and _arcs"
        )

        # _lines should be a dict of lists of ints.
        for fname, lines in iitems(self._lines):
            assert isinstance(fname, string_class), "Key in _lines shouldn't be %r" % (fname,)
            assert all(isinstance(x, int) for x in lines), (
                "_lines[%r] shouldn't be %r" % (fname, lines)
            )

        # _arcs should be a dict of lists of pairs of ints.
        for fname, arcs in iitems(self._arcs):
            assert isinstance(fname, string_class), "Key in _arcs shouldn't be %r" % (fname,)
            assert all(isinstance(x, int) and isinstance(y, int) for x, y in arcs), (
                "_arcs[%r] shouldn't be %r" % (fname, arcs)
            )

        # _plugins should have only non-empty strings as values.
        for fname, plugin in iitems(self._plugins):
            assert isinstance(fname, string_class), "Key in _plugins shouldn't be %r" % (fname,)
            assert plugin and isinstance(plugin, string_class), (
                "_plugins[%r] shoudn't be %r" % (fname, plugin)
            )

    def add_to_hash(self, filename, hasher):
        """Contribute `filename`'s data to the `hasher`.

        Arguments:
            filename (str): the filename we're interested in.
            hasher (:class:`coverage.misc.Hasher`): the Hasher to update with
                the file's data.

        """
        if self._arcs:
            hasher.update(sorted(self.arcs(filename)))
        else:
            hasher.update(sorted(self.lines(filename)))
        hasher.update(self.plugin_name(filename))

    ##
    ## Internal
    ##

    def _has_lines(self):
        """Do we have data in self._lines?"""
        return bool(self._lines)

    def _has_arcs(self):
        """Do we have data in self._arcs?"""
        return bool(self._arcs)


class CoverageDataFiles(object):
    """Manage the use of coverage data files."""

    def __init__(self, basename=None):
        """Create a CoverageDataFiles to manage data files.

        `basename` is the name of the file to use for storing data.

        """
        # Construct the filename that will be used for data storage.
        self.filename = os.path.abspath(basename or ".coverage")

    def erase(self):
        """Erase the data from the file storage."""
        file_be_gone(self.filename)

    def read(self, data):
        """Read the coverage data."""
        if os.path.exists(self.filename):
            data.read_file(self.filename)

    def write(self, data, suffix=None):
        """Write the collected coverage data to a file.

        `suffix` is a suffix to append to the base file name. This can be used
        for multiple or parallel execution, so that many coverage data files
        can exist simultaneously.  A dot will be used to join the base name and
        the suffix.

        """
        filename = self.filename
        if suffix is True:
            # If data_suffix was a simple true value, then make a suffix with
            # plenty of distinguishing information.  We do this here in
            # `save()` at the last minute so that the pid will be correct even
            # if the process forks.
            extra = ""
            if _TEST_NAME_FILE:                             # pragma: debugging
                with open(_TEST_NAME_FILE) as f:
                    test_name = f.read()
                extra = "." + test_name
            suffix = "%s%s.%s.%06d" % (
                socket.gethostname(), extra, os.getpid(),
                random.randint(0, 999999)
            )

        if suffix:
            filename += "." + suffix
        data.write_file(filename)

    def combine_parallel_data(self, data, aliases=None, data_dirs=None):
        """Combine a number of data files together.

        Treat `self.filename` as a file prefix, and combine the data from all
        of the data files starting with that prefix plus a dot.

        If `aliases` is provided, it's a `PathAliases` object that is used to
        re-map paths to match the local machine's.

        If `data_dirs` is provided, then it combines the data files from each
        directory into a single file.

        """
        data_dir, local = os.path.split(self.filename)
        localdot = local + '.*'

        data_dirs = data_dirs or [data_dir]
        files_to_combine = []
        for d in data_dirs:
            pattern = os.path.join(os.path.abspath(d), localdot)
            files_to_combine.extend(glob.glob(pattern))

        for f in files_to_combine:
            new_data = CoverageData()
            new_data.read_file(f)
            data.update(new_data, aliases=aliases)
            os.remove(f)


def debug_main(args):
    """Dump the raw data from data files.

    Run this as::

        $ python -m coverage.data [FILE]

    """
    for filename in (args or [".coverage"]):
        print("--- {0} ------------------------------".format(filename))
        data = CoverageData._read_raw_data_file(filename)
        print(pretty_data(data))


if __name__ == '__main__':
    import sys
    debug_main(sys.argv[1:])