diff options
author | Lorry <lorry@roadtrain.codethink.co.uk> | 2012-08-22 15:47:16 +0100 |
---|---|---|
committer | Lorry <lorry@roadtrain.codethink.co.uk> | 2012-08-22 15:47:16 +0100 |
commit | 25335618bf8755ce6b116ee14f47f5a1f2c821e9 (patch) | |
tree | d889d7ab3f9f985d0c54c534cb8052bd2e6d7163 /bzrlib/gpg.py | |
download | bzr-tarball-25335618bf8755ce6b116ee14f47f5a1f2c821e9.tar.gz |
Tarball conversion
Diffstat (limited to 'bzrlib/gpg.py')
-rw-r--r-- | bzrlib/gpg.py | 557 |
1 files changed, 557 insertions, 0 deletions
diff --git a/bzrlib/gpg.py b/bzrlib/gpg.py new file mode 100644 index 0000000..fc3a940 --- /dev/null +++ b/bzrlib/gpg.py @@ -0,0 +1,557 @@ +# Copyright (C) 2005, 2011 Canonical Ltd +# Authors: Robert Collins <robert.collins@canonical.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""GPG signing and checking logic.""" + +from __future__ import absolute_import + +import os +import sys +from StringIO import StringIO + +from bzrlib.lazy_import import lazy_import +lazy_import(globals(), """ +import errno +import subprocess + +from bzrlib import ( + config, + errors, + trace, + ui, + ) +from bzrlib.i18n import ( + gettext, + ngettext, + ) +""") + +from bzrlib.symbol_versioning import ( + deprecated_in, + deprecated_method, + ) + +#verification results +SIGNATURE_VALID = 0 +SIGNATURE_KEY_MISSING = 1 +SIGNATURE_NOT_VALID = 2 +SIGNATURE_NOT_SIGNED = 3 +SIGNATURE_EXPIRED = 4 + + +def bulk_verify_signatures(repository, revids, strategy, + process_events_callback=None): + """Do verifications on a set of revisions + + :param repository: repository object + :param revids: list of revision ids to verify + :param strategy: GPG strategy to use + :param process_events_callback: method to call for GUI frontends that + want to keep their UI refreshed + + :return: count dictionary of results of each type, + result list for each revision, + boolean True if all results are verified successfully + """ + count = {SIGNATURE_VALID: 0, + SIGNATURE_KEY_MISSING: 0, + SIGNATURE_NOT_VALID: 0, + SIGNATURE_NOT_SIGNED: 0, + SIGNATURE_EXPIRED: 0} + result = [] + all_verifiable = True + total = len(revids) + pb = ui.ui_factory.nested_progress_bar() + try: + for i, (rev_id, verification_result, uid) in enumerate( + repository.verify_revision_signatures( + revids, strategy)): + pb.update("verifying signatures", i, total) + result.append([rev_id, verification_result, uid]) + count[verification_result] += 1 + if verification_result != SIGNATURE_VALID: + all_verifiable = False + if process_events_callback is not None: + process_events_callback() + finally: + pb.finished() + return (count, result, all_verifiable) + + +class DisabledGPGStrategy(object): + """A GPG Strategy that makes everything fail.""" + + @staticmethod + def verify_signatures_available(): + return True + + def __init__(self, ignored): + """Real strategies take a configuration.""" + + def sign(self, content): + raise errors.SigningFailed('Signing is disabled.') + + def verify(self, content, testament): + raise errors.SignatureVerificationFailed('Signature verification is \ +disabled.') + + def set_acceptable_keys(self, command_line_input): + pass + + +class LoopbackGPGStrategy(object): + """A GPG Strategy that acts like 'cat' - data is just passed through. + Used in tests. + """ + + @staticmethod + def verify_signatures_available(): + return True + + def __init__(self, ignored): + """Real strategies take a configuration.""" + + def sign(self, content): + return ("-----BEGIN PSEUDO-SIGNED CONTENT-----\n" + content + + "-----END PSEUDO-SIGNED CONTENT-----\n") + + def verify(self, content, testament): + return SIGNATURE_VALID, None + + def set_acceptable_keys(self, command_line_input): + if command_line_input is not None: + patterns = command_line_input.split(",") + self.acceptable_keys = [] + for pattern in patterns: + if pattern == "unknown": + pass + else: + self.acceptable_keys.append(pattern) + + @deprecated_method(deprecated_in((2, 6, 0))) + def do_verifications(self, revisions, repository): + return bulk_verify_signatures(repository, revisions, self) + + @deprecated_method(deprecated_in((2, 6, 0))) + def valid_commits_message(self, count): + return valid_commits_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def unknown_key_message(self, count): + return unknown_key_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def commit_not_valid_message(self, count): + return commit_not_valid_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def commit_not_signed_message(self, count): + return commit_not_signed_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def expired_commit_message(self, count): + return expired_commit_message(count) + + +def _set_gpg_tty(): + tty = os.environ.get('TTY') + if tty is not None: + os.environ['GPG_TTY'] = tty + trace.mutter('setting GPG_TTY=%s', tty) + else: + # This is not quite worthy of a warning, because some people + # don't need GPG_TTY to be set. But it is worthy of a big mark + # in ~/.bzr.log, so that people can debug it if it happens to them + trace.mutter('** Env var TTY empty, cannot set GPG_TTY.' + ' Is TTY exported?') + + +class GPGStrategy(object): + """GPG Signing and checking facilities.""" + + acceptable_keys = None + + def __init__(self, config_stack): + self._config_stack = config_stack + try: + import gpgme + self.context = gpgme.Context() + except ImportError, error: + pass # can't use verify() + + @staticmethod + def verify_signatures_available(): + """ + check if this strategy can verify signatures + + :return: boolean if this strategy can verify signatures + """ + try: + import gpgme + return True + except ImportError, error: + return False + + def _command_line(self): + key = self._config_stack.get('gpg_signing_key') + if key is None or key == 'default': + # 'default' or not setting gpg_signing_key at all means we should + # use the user email address + key = config.extract_email_address(self._config_stack.get('email')) + return [self._config_stack.get('gpg_signing_command'), '--clearsign', + '-u', key] + + def sign(self, content): + if isinstance(content, unicode): + raise errors.BzrBadParameterUnicode('content') + ui.ui_factory.clear_term() + + preexec_fn = _set_gpg_tty + if sys.platform == 'win32': + # Win32 doesn't support preexec_fn, but wouldn't support TTY anyway. + preexec_fn = None + try: + process = subprocess.Popen(self._command_line(), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + preexec_fn=preexec_fn) + try: + result = process.communicate(content)[0] + if process.returncode is None: + process.wait() + if process.returncode != 0: + raise errors.SigningFailed(self._command_line()) + return result + except OSError, e: + if e.errno == errno.EPIPE: + raise errors.SigningFailed(self._command_line()) + else: + raise + except ValueError: + # bad subprocess parameters, should never happen. + raise + except OSError, e: + if e.errno == errno.ENOENT: + # gpg is not installed + raise errors.SigningFailed(self._command_line()) + else: + raise + + def verify(self, content, testament): + """Check content has a valid signature. + + :param content: the commit signature + :param testament: the valid testament string for the commit + + :return: SIGNATURE_VALID or a failed SIGNATURE_ value, key uid if valid + """ + try: + import gpgme + except ImportError, error: + raise errors.GpgmeNotInstalled(error) + + signature = StringIO(content) + plain_output = StringIO() + try: + result = self.context.verify(signature, None, plain_output) + except gpgme.GpgmeError,error: + raise errors.SignatureVerificationFailed(error[2]) + + # No result if input is invalid. + # test_verify_invalid() + if len(result) == 0: + return SIGNATURE_NOT_VALID, None + # User has specified a list of acceptable keys, check our result is in + # it. test_verify_unacceptable_key() + fingerprint = result[0].fpr + if self.acceptable_keys is not None: + if not fingerprint in self.acceptable_keys: + return SIGNATURE_KEY_MISSING, fingerprint[-8:] + # Check the signature actually matches the testament. + # test_verify_bad_testament() + if testament != plain_output.getvalue(): + return SIGNATURE_NOT_VALID, None + # Yay gpgme set the valid bit. + # Can't write a test for this one as you can't set a key to be + # trusted using gpgme. + if result[0].summary & gpgme.SIGSUM_VALID: + key = self.context.get_key(fingerprint) + name = key.uids[0].name + email = key.uids[0].email + return SIGNATURE_VALID, name + " <" + email + ">" + # Sigsum_red indicates a problem, unfortunatly I have not been able + # to write any tests which actually set this. + if result[0].summary & gpgme.SIGSUM_RED: + return SIGNATURE_NOT_VALID, None + # GPG does not know this key. + # test_verify_unknown_key() + if result[0].summary & gpgme.SIGSUM_KEY_MISSING: + return SIGNATURE_KEY_MISSING, fingerprint[-8:] + # Summary isn't set if sig is valid but key is untrusted but if user + # has explicity set the key as acceptable we can validate it. + if result[0].summary == 0 and self.acceptable_keys is not None: + if fingerprint in self.acceptable_keys: + # test_verify_untrusted_but_accepted() + return SIGNATURE_VALID, None + # test_verify_valid_but_untrusted() + if result[0].summary == 0 and self.acceptable_keys is None: + return SIGNATURE_NOT_VALID, None + if result[0].summary & gpgme.SIGSUM_KEY_EXPIRED: + expires = self.context.get_key(result[0].fpr).subkeys[0].expires + if expires > result[0].timestamp: + # The expired key was not expired at time of signing. + # test_verify_expired_but_valid() + return SIGNATURE_EXPIRED, fingerprint[-8:] + else: + # I can't work out how to create a test where the signature + # was expired at the time of signing. + return SIGNATURE_NOT_VALID, None + # A signature from a revoked key gets this. + # test_verify_revoked_signature() + if result[0].summary & gpgme.SIGSUM_SYS_ERROR: + return SIGNATURE_NOT_VALID, None + # Other error types such as revoked keys should (I think) be caught by + # SIGSUM_RED so anything else means something is buggy. + raise errors.SignatureVerificationFailed("Unknown GnuPG key "\ + "verification result") + + def set_acceptable_keys(self, command_line_input): + """Set the acceptable keys for verifying with this GPGStrategy. + + :param command_line_input: comma separated list of patterns from + command line + :return: nothing + """ + key_patterns = None + acceptable_keys_config = self._config_stack.get('acceptable_keys') + try: + if isinstance(acceptable_keys_config, unicode): + acceptable_keys_config = str(acceptable_keys_config) + except UnicodeEncodeError: + # gpg Context.keylist(pattern) does not like unicode + raise errors.BzrCommandError( + gettext('Only ASCII permitted in option names')) + + if acceptable_keys_config is not None: + key_patterns = acceptable_keys_config + if command_line_input is not None: # command line overrides config + key_patterns = command_line_input + if key_patterns is not None: + patterns = key_patterns.split(",") + + self.acceptable_keys = [] + for pattern in patterns: + result = self.context.keylist(pattern) + found_key = False + for key in result: + found_key = True + self.acceptable_keys.append(key.subkeys[0].fpr) + trace.mutter("Added acceptable key: " + key.subkeys[0].fpr) + if not found_key: + trace.note(gettext( + "No GnuPG key results for pattern: {0}" + ).format(pattern)) + + @deprecated_method(deprecated_in((2, 6, 0))) + def do_verifications(self, revisions, repository, + process_events_callback=None): + """do verifications on a set of revisions + + :param revisions: list of revision ids to verify + :param repository: repository object + :param process_events_callback: method to call for GUI frontends that + want to keep their UI refreshed + + :return: count dictionary of results of each type, + result list for each revision, + boolean True if all results are verified successfully + """ + return bulk_verify_signatures(repository, revisions, self, + process_events_callback) + + @deprecated_method(deprecated_in((2, 6, 0))) + def verbose_valid_message(self, result): + """takes a verify result and returns list of signed commits strings""" + return verbose_valid_message(result) + + @deprecated_method(deprecated_in((2, 6, 0))) + def verbose_not_valid_message(self, result, repo): + """takes a verify result and returns list of not valid commit info""" + return verbose_not_valid_message(result, repo) + + @deprecated_method(deprecated_in((2, 6, 0))) + def verbose_not_signed_message(self, result, repo): + """takes a verify result and returns list of not signed commit info""" + return verbose_not_valid_message(result, repo) + + @deprecated_method(deprecated_in((2, 6, 0))) + def verbose_missing_key_message(self, result): + """takes a verify result and returns list of missing key info""" + return verbose_missing_key_message(result) + + @deprecated_method(deprecated_in((2, 6, 0))) + def verbose_expired_key_message(self, result, repo): + """takes a verify result and returns list of expired key info""" + return verbose_expired_key_message(result, repo) + + @deprecated_method(deprecated_in((2, 6, 0))) + def valid_commits_message(self, count): + """returns message for number of commits""" + return valid_commits_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def unknown_key_message(self, count): + """returns message for number of commits""" + return unknown_key_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def commit_not_valid_message(self, count): + """returns message for number of commits""" + return commit_not_valid_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def commit_not_signed_message(self, count): + """returns message for number of commits""" + return commit_not_signed_message(count) + + @deprecated_method(deprecated_in((2, 6, 0))) + def expired_commit_message(self, count): + """returns message for number of commits""" + return expired_commit_message(count) + + +def valid_commits_message(count): + """returns message for number of commits""" + return gettext(u"{0} commits with valid signatures").format( + count[SIGNATURE_VALID]) + + +def unknown_key_message(count): + """returns message for number of commits""" + return ngettext(u"{0} commit with unknown key", + u"{0} commits with unknown keys", + count[SIGNATURE_KEY_MISSING]).format( + count[SIGNATURE_KEY_MISSING]) + + +def commit_not_valid_message(count): + """returns message for number of commits""" + return ngettext(u"{0} commit not valid", + u"{0} commits not valid", + count[SIGNATURE_NOT_VALID]).format( + count[SIGNATURE_NOT_VALID]) + + +def commit_not_signed_message(count): + """returns message for number of commits""" + return ngettext(u"{0} commit not signed", + u"{0} commits not signed", + count[SIGNATURE_NOT_SIGNED]).format( + count[SIGNATURE_NOT_SIGNED]) + + +def expired_commit_message(count): + """returns message for number of commits""" + return ngettext(u"{0} commit with key now expired", + u"{0} commits with key now expired", + count[SIGNATURE_EXPIRED]).format( + count[SIGNATURE_EXPIRED]) + + +def verbose_expired_key_message(result, repo): + """takes a verify result and returns list of expired key info""" + signers = {} + fingerprint_to_authors = {} + for rev_id, validity, fingerprint in result: + if validity == SIGNATURE_EXPIRED: + revision = repo.get_revision(rev_id) + authors = ', '.join(revision.get_apparent_authors()) + signers.setdefault(fingerprint, 0) + signers[fingerprint] += 1 + fingerprint_to_authors[fingerprint] = authors + result = [] + for fingerprint, number in signers.items(): + result.append( + ngettext(u"{0} commit by author {1} with key {2} now expired", + u"{0} commits by author {1} with key {2} now expired", + number).format( + number, fingerprint_to_authors[fingerprint], fingerprint)) + return result + + +def verbose_valid_message(result): + """takes a verify result and returns list of signed commits strings""" + signers = {} + for rev_id, validity, uid in result: + if validity == SIGNATURE_VALID: + signers.setdefault(uid, 0) + signers[uid] += 1 + result = [] + for uid, number in signers.items(): + result.append(ngettext(u"{0} signed {1} commit", + u"{0} signed {1} commits", + number).format(uid, number)) + return result + + +def verbose_not_valid_message(result, repo): + """takes a verify result and returns list of not valid commit info""" + signers = {} + for rev_id, validity, empty in result: + if validity == SIGNATURE_NOT_VALID: + revision = repo.get_revision(rev_id) + authors = ', '.join(revision.get_apparent_authors()) + signers.setdefault(authors, 0) + signers[authors] += 1 + result = [] + for authors, number in signers.items(): + result.append(ngettext(u"{0} commit by author {1}", + u"{0} commits by author {1}", + number).format(number, authors)) + return result + + +def verbose_not_signed_message(result, repo): + """takes a verify result and returns list of not signed commit info""" + signers = {} + for rev_id, validity, empty in result: + if validity == SIGNATURE_NOT_SIGNED: + revision = repo.get_revision(rev_id) + authors = ', '.join(revision.get_apparent_authors()) + signers.setdefault(authors, 0) + signers[authors] += 1 + result = [] + for authors, number in signers.items(): + result.append(ngettext(u"{0} commit by author {1}", + u"{0} commits by author {1}", + number).format(number, authors)) + return result + + +def verbose_missing_key_message(result): + """takes a verify result and returns list of missing key info""" + signers = {} + for rev_id, validity, fingerprint in result: + if validity == SIGNATURE_KEY_MISSING: + signers.setdefault(fingerprint, 0) + signers[fingerprint] += 1 + result = [] + for fingerprint, number in signers.items(): + result.append(ngettext(u"Unknown key {0} signed {1} commit", + u"Unknown key {0} signed {1} commits", + number).format(fingerprint, number)) + return result |