summaryrefslogtreecommitdiff
path: root/hgext/gpg.py
diff options
context:
space:
mode:
Diffstat (limited to 'hgext/gpg.py')
-rw-r--r--hgext/gpg.py289
1 files changed, 289 insertions, 0 deletions
diff --git a/hgext/gpg.py b/hgext/gpg.py
new file mode 100644
index 0000000..2ded54c
--- /dev/null
+++ b/hgext/gpg.py
@@ -0,0 +1,289 @@
+# Copyright 2005, 2006 Benoit Boissinot <benoit.boissinot@ens-lyon.org>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+'''commands to sign and verify changesets'''
+
+import os, tempfile, binascii
+from mercurial import util, commands, match, cmdutil
+from mercurial import node as hgnode
+from mercurial.i18n import _
+
+cmdtable = {}
+command = cmdutil.command(cmdtable)
+testedwith = 'internal'
+
+class gpg(object):
+ def __init__(self, path, key=None):
+ self.path = path
+ self.key = (key and " --local-user \"%s\"" % key) or ""
+
+ def sign(self, data):
+ gpgcmd = "%s --sign --detach-sign%s" % (self.path, self.key)
+ return util.filter(data, gpgcmd)
+
+ def verify(self, data, sig):
+ """ returns of the good and bad signatures"""
+ sigfile = datafile = None
+ try:
+ # create temporary files
+ fd, sigfile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".sig")
+ fp = os.fdopen(fd, 'wb')
+ fp.write(sig)
+ fp.close()
+ fd, datafile = tempfile.mkstemp(prefix="hg-gpg-", suffix=".txt")
+ fp = os.fdopen(fd, 'wb')
+ fp.write(data)
+ fp.close()
+ gpgcmd = ("%s --logger-fd 1 --status-fd 1 --verify "
+ "\"%s\" \"%s\"" % (self.path, sigfile, datafile))
+ ret = util.filter("", gpgcmd)
+ finally:
+ for f in (sigfile, datafile):
+ try:
+ if f:
+ os.unlink(f)
+ except OSError:
+ pass
+ keys = []
+ key, fingerprint = None, None
+ err = ""
+ for l in ret.splitlines():
+ # see DETAILS in the gnupg documentation
+ # filter the logger output
+ if not l.startswith("[GNUPG:]"):
+ continue
+ l = l[9:]
+ if l.startswith("ERRSIG"):
+ err = _("error while verifying signature")
+ break
+ elif l.startswith("VALIDSIG"):
+ # fingerprint of the primary key
+ fingerprint = l.split()[10]
+ elif (l.startswith("GOODSIG") or
+ l.startswith("EXPSIG") or
+ l.startswith("EXPKEYSIG") or
+ l.startswith("BADSIG")):
+ if key is not None:
+ keys.append(key + [fingerprint])
+ key = l.split(" ", 2)
+ fingerprint = None
+ if err:
+ return err, []
+ if key is not None:
+ keys.append(key + [fingerprint])
+ return err, keys
+
+def newgpg(ui, **opts):
+ """create a new gpg instance"""
+ gpgpath = ui.config("gpg", "cmd", "gpg")
+ gpgkey = opts.get('key')
+ if not gpgkey:
+ gpgkey = ui.config("gpg", "key", None)
+ return gpg(gpgpath, gpgkey)
+
+def sigwalk(repo):
+ """
+ walk over every sigs, yields a couple
+ ((node, version, sig), (filename, linenumber))
+ """
+ def parsefile(fileiter, context):
+ ln = 1
+ for l in fileiter:
+ if not l:
+ continue
+ yield (l.split(" ", 2), (context, ln))
+ ln += 1
+
+ # read the heads
+ fl = repo.file(".hgsigs")
+ for r in reversed(fl.heads()):
+ fn = ".hgsigs|%s" % hgnode.short(r)
+ for item in parsefile(fl.read(r).splitlines(), fn):
+ yield item
+ try:
+ # read local signatures
+ fn = "localsigs"
+ for item in parsefile(repo.opener(fn), fn):
+ yield item
+ except IOError:
+ pass
+
+def getkeys(ui, repo, mygpg, sigdata, context):
+ """get the keys who signed a data"""
+ fn, ln = context
+ node, version, sig = sigdata
+ prefix = "%s:%d" % (fn, ln)
+ node = hgnode.bin(node)
+
+ data = node2txt(repo, node, version)
+ sig = binascii.a2b_base64(sig)
+ err, keys = mygpg.verify(data, sig)
+ if err:
+ ui.warn("%s:%d %s\n" % (fn, ln , err))
+ return None
+
+ validkeys = []
+ # warn for expired key and/or sigs
+ for key in keys:
+ if key[0] == "BADSIG":
+ ui.write(_("%s Bad signature from \"%s\"\n") % (prefix, key[2]))
+ continue
+ if key[0] == "EXPSIG":
+ ui.write(_("%s Note: Signature has expired"
+ " (signed by: \"%s\")\n") % (prefix, key[2]))
+ elif key[0] == "EXPKEYSIG":
+ ui.write(_("%s Note: This key has expired"
+ " (signed by: \"%s\")\n") % (prefix, key[2]))
+ validkeys.append((key[1], key[2], key[3]))
+ return validkeys
+
+@command("sigs", [], _('hg sigs'))
+def sigs(ui, repo):
+ """list signed changesets"""
+ mygpg = newgpg(ui)
+ revs = {}
+
+ for data, context in sigwalk(repo):
+ node, version, sig = data
+ fn, ln = context
+ try:
+ n = repo.lookup(node)
+ except KeyError:
+ ui.warn(_("%s:%d node does not exist\n") % (fn, ln))
+ continue
+ r = repo.changelog.rev(n)
+ keys = getkeys(ui, repo, mygpg, data, context)
+ if not keys:
+ continue
+ revs.setdefault(r, [])
+ revs[r].extend(keys)
+ for rev in sorted(revs, reverse=True):
+ for k in revs[rev]:
+ r = "%5d:%s" % (rev, hgnode.hex(repo.changelog.node(rev)))
+ ui.write("%-30s %s\n" % (keystr(ui, k), r))
+
+@command("sigcheck", [], _('hg sigcheck REV'))
+def check(ui, repo, rev):
+ """verify all the signatures there may be for a particular revision"""
+ mygpg = newgpg(ui)
+ rev = repo.lookup(rev)
+ hexrev = hgnode.hex(rev)
+ keys = []
+
+ for data, context in sigwalk(repo):
+ node, version, sig = data
+ if node == hexrev:
+ k = getkeys(ui, repo, mygpg, data, context)
+ if k:
+ keys.extend(k)
+
+ if not keys:
+ ui.write(_("no valid signature for %s\n") % hgnode.short(rev))
+ return
+
+ # print summary
+ ui.write("%s is signed by:\n" % hgnode.short(rev))
+ for key in keys:
+ ui.write(" %s\n" % keystr(ui, key))
+
+def keystr(ui, key):
+ """associate a string to a key (username, comment)"""
+ keyid, user, fingerprint = key
+ comment = ui.config("gpg", fingerprint, None)
+ if comment:
+ return "%s (%s)" % (user, comment)
+ else:
+ return user
+
+@command("sign",
+ [('l', 'local', None, _('make the signature local')),
+ ('f', 'force', None, _('sign even if the sigfile is modified')),
+ ('', 'no-commit', None, _('do not commit the sigfile after signing')),
+ ('k', 'key', '',
+ _('the key id to sign with'), _('ID')),
+ ('m', 'message', '',
+ _('commit message'), _('TEXT')),
+ ] + commands.commitopts2,
+ _('hg sign [OPTION]... [REV]...'))
+def sign(ui, repo, *revs, **opts):
+ """add a signature for the current or given revision
+
+ If no revision is given, the parent of the working directory is used,
+ or tip if no revision is checked out.
+
+ See :hg:`help dates` for a list of formats valid for -d/--date.
+ """
+
+ mygpg = newgpg(ui, **opts)
+ sigver = "0"
+ sigmessage = ""
+
+ date = opts.get('date')
+ if date:
+ opts['date'] = util.parsedate(date)
+
+ if revs:
+ nodes = [repo.lookup(n) for n in revs]
+ else:
+ nodes = [node for node in repo.dirstate.parents()
+ if node != hgnode.nullid]
+ if len(nodes) > 1:
+ raise util.Abort(_('uncommitted merge - please provide a '
+ 'specific revision'))
+ if not nodes:
+ nodes = [repo.changelog.tip()]
+
+ for n in nodes:
+ hexnode = hgnode.hex(n)
+ ui.write(_("signing %d:%s\n") % (repo.changelog.rev(n),
+ hgnode.short(n)))
+ # build data
+ data = node2txt(repo, n, sigver)
+ sig = mygpg.sign(data)
+ if not sig:
+ raise util.Abort(_("error while signing"))
+ sig = binascii.b2a_base64(sig)
+ sig = sig.replace("\n", "")
+ sigmessage += "%s %s %s\n" % (hexnode, sigver, sig)
+
+ # write it
+ if opts['local']:
+ repo.opener.append("localsigs", sigmessage)
+ return
+
+ msigs = match.exact(repo.root, '', ['.hgsigs'])
+ s = repo.status(match=msigs, unknown=True, ignored=True)[:6]
+ if util.any(s) and not opts["force"]:
+ raise util.Abort(_("working copy of .hgsigs is changed "
+ "(please commit .hgsigs manually "
+ "or use --force)"))
+
+ sigsfile = repo.wfile(".hgsigs", "ab")
+ sigsfile.write(sigmessage)
+ sigsfile.close()
+
+ if '.hgsigs' not in repo.dirstate:
+ repo[None].add([".hgsigs"])
+
+ if opts["no_commit"]:
+ return
+
+ message = opts['message']
+ if not message:
+ # we don't translate commit messages
+ message = "\n".join(["Added signature for changeset %s"
+ % hgnode.short(n)
+ for n in nodes])
+ try:
+ repo.commit(message, opts['user'], opts['date'], match=msigs)
+ except ValueError, inst:
+ raise util.Abort(str(inst))
+
+def node2txt(repo, node, ver):
+ """map a manifest into some text"""
+ if ver == "0":
+ return "%s\n" % hgnode.hex(node)
+ else:
+ raise util.Abort(_("unknown signature version"))