summaryrefslogtreecommitdiff
path: root/bzrlib/plugins/changelog_merge/changelog_merge.py
blob: 2b3ce8a26833d59524ec807d97c39b75537edcc8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# Copyright (C) 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Merge logic for changelog_merge plugin."""

from __future__ import absolute_import

import difflib

from bzrlib import (
    debug,
    merge,
    urlutils,
    )
from bzrlib.merge3 import Merge3
from bzrlib.trace import mutter


def changelog_entries(lines):
    """Return a list of changelog entries.

    :param lines: lines of a changelog file.
    :returns: list of entries.  Each entry is a tuple of lines.
    """
    entries = []
    for line in lines:
        if line[0] not in (' ', '\t', '\n'):
            # new entry
            entries.append([line])
        else:
            try:
                entry = entries[-1]
            except IndexError:
                # Cope with leading blank lines.
                entries.append([])
                entry = entries[-1]
            entry.append(line)
    return map(tuple, entries)


def entries_to_lines(entries):
    """Turn a list of entries into a flat iterable of lines."""
    for entry in entries:
        for line in entry:
            yield line


class ChangeLogMerger(merge.ConfigurableFileMerger):
    """Merge GNU-format ChangeLog files."""

    name_prefix = "changelog"

    def get_filepath(self, params, tree):
        """Calculate the path to the file in a tree.

        This is overridden to return just the basename, rather than full path,
        so that e.g. if the config says ``changelog_merge_files = ChangeLog``,
        then all ChangeLog files in the tree will match (not just one in the
        root of the tree).
        
        :param params: A MergeHookParams describing the file to merge
        :param tree: a Tree, e.g. self.merger.this_tree.
        """
        return urlutils.basename(tree.id2path(params.file_id))

    def merge_text(self, params):
        """Merge changelog changes.

         * new entries from other will float to the top
         * edits to older entries are preserved
        """
        # Transform files into lists of changelog entries
        this_entries = changelog_entries(params.this_lines)
        other_entries = changelog_entries(params.other_lines)
        base_entries = changelog_entries(params.base_lines)
        try:
            result_entries = merge_entries(
                base_entries, this_entries, other_entries)
        except EntryConflict:
            # XXX: generating a nice conflict file would be better
            return 'not_applicable', None
        # Transform the merged elements back into real blocks of lines.
        return 'success', entries_to_lines(result_entries)


class EntryConflict(Exception):
    pass


def default_guess_edits(new_entries, deleted_entries, entry_as_str=''.join):
    """Default implementation of guess_edits param of merge_entries.

    This algorithm does O(N^2 * logN) SequenceMatcher.ratio() calls, which is
    pretty bad, but it shouldn't be used very often.
    """
    deleted_entries_as_strs = map(entry_as_str, deleted_entries)
    new_entries_as_strs = map(entry_as_str, new_entries)
    result_new = list(new_entries)
    result_deleted = list(deleted_entries)
    result_edits = []
    sm = difflib.SequenceMatcher()
    CUTOFF = 0.8
    while True:
        best = None
        best_score = CUTOFF
        # Compare each new entry with each old entry to find the best match
        for new_entry_as_str in new_entries_as_strs:
            sm.set_seq1(new_entry_as_str)
            for old_entry_as_str in deleted_entries_as_strs:
                sm.set_seq2(old_entry_as_str)
                score = sm.ratio()
                if score > best_score:
                    best = new_entry_as_str, old_entry_as_str
                    best_score = score
        if best is not None:
            # Add the best match to the list of edits, and remove it from the
            # the list of new/old entries.  Also remove it from the new/old
            # lists for the next round.
            del_index = deleted_entries_as_strs.index(best[1])
            new_index = new_entries_as_strs.index(best[0])
            result_edits.append(
                (result_deleted[del_index], result_new[new_index]))
            del deleted_entries_as_strs[del_index], result_deleted[del_index]
            del new_entries_as_strs[new_index], result_new[new_index]
        else:
            # No match better than CUTOFF exists in the remaining new and old
            # entries.
            break
    return result_new, result_deleted, result_edits


def merge_entries(base_entries, this_entries, other_entries,
        guess_edits=default_guess_edits):
    """Merge changelog given base, this, and other versions."""
    m3 = Merge3(base_entries, this_entries, other_entries, allow_objects=True)
    result_entries = []
    at_top = True
    for group in m3.merge_groups():
        if 'changelog_merge' in debug.debug_flags:
            mutter('merge group:\n%r', group)
        group_kind = group[0]
        if group_kind == 'conflict':
            _, base, this, other = group
            # Find additions
            new_in_other = [
                entry for entry in other if entry not in base]
            # Find deletions
            deleted_in_other = [
                entry for entry in base if entry not in other]
            if at_top and deleted_in_other:
                # Magic!  Compare deletions and additions to try spot edits
                new_in_other, deleted_in_other, edits_in_other = guess_edits(
                    new_in_other, deleted_in_other)
            else:
                # Changes not made at the top are always preserved as is, no
                # need to try distinguish edits from adds and deletes.
                edits_in_other = []
            if 'changelog_merge' in debug.debug_flags:
                mutter('at_top: %r', at_top)
                mutter('new_in_other: %r', new_in_other)
                mutter('deleted_in_other: %r', deleted_in_other)
                mutter('edits_in_other: %r', edits_in_other)
            # Apply deletes and edits
            updated_this = [
                entry for entry in this if entry not in deleted_in_other]
            for old_entry, new_entry in edits_in_other:
                try:
                    index = updated_this.index(old_entry)
                except ValueError:
                    # edited entry no longer present in this!  Just give up and
                    # declare a conflict.
                    raise EntryConflict()
                updated_this[index] = new_entry
            if 'changelog_merge' in debug.debug_flags:
                mutter('updated_this: %r', updated_this)
            if at_top:
                # Float new entries from other to the top
                result_entries = new_in_other + result_entries
            else:
                result_entries.extend(new_in_other)
            result_entries.extend(updated_this)
        else: # unchanged, same, a, or b.
            lines = group[1]
            result_entries.extend(lines)
        at_top = False
    return result_entries