diff options
Diffstat (limited to 'dns/dnssec.py')
| -rw-r--r-- | dns/dnssec.py | 429 |
1 files changed, 0 insertions, 429 deletions
diff --git a/dns/dnssec.py b/dns/dnssec.py deleted file mode 100644 index b9bc874..0000000 --- a/dns/dnssec.py +++ /dev/null @@ -1,429 +0,0 @@ -# Copyright (C) 2003-2007, 2009, 2011 Nominum, Inc. -# -# Permission to use, copy, modify, and distribute this software and its -# documentation for any purpose with or without fee is hereby granted, -# provided that the above copyright notice and this permission notice -# appear in all copies. -# -# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES -# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR -# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT -# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -"""Common DNSSEC-related functions and constants.""" - -import io -import struct -import time - -import dns.exception -import dns.hash -import dns.name -import dns.node -import dns.rdataset -import dns.rdata -import dns.rdatatype -import dns.rdataclass - -class UnsupportedAlgorithm(dns.exception.DNSException): - """The DNSSEC algorithm is not supported.""" - -class ValidationFailure(dns.exception.DNSException): - """The DNSSEC signature is invalid.""" - -RSAMD5 = 1 -DH = 2 -DSA = 3 -ECC = 4 -RSASHA1 = 5 -DSANSEC3SHA1 = 6 -RSASHA1NSEC3SHA1 = 7 -RSASHA256 = 8 -RSASHA512 = 10 -ECDSAP256SHA256 = 13 -ECDSAP384SHA384 = 14 -INDIRECT = 252 -PRIVATEDNS = 253 -PRIVATEOID = 254 - -_algorithm_by_text = { - 'RSAMD5' : RSAMD5, - 'DH' : DH, - 'DSA' : DSA, - 'ECC' : ECC, - 'RSASHA1' : RSASHA1, - 'DSANSEC3SHA1' : DSANSEC3SHA1, - 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1, - 'RSASHA256' : RSASHA256, - 'RSASHA512' : RSASHA512, - 'INDIRECT' : INDIRECT, - 'ECDSAP256SHA256' : ECDSAP256SHA256, - 'ECDSAP384SHA384' : ECDSAP384SHA384, - 'PRIVATEDNS' : PRIVATEDNS, - 'PRIVATEOID' : PRIVATEOID, - } - -# We construct the inverse mapping programmatically to ensure that we -# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that -# would cause the mapping not to be true inverse. - -_algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.items()]) - -def algorithm_from_text(text): - """Convert text into a DNSSEC algorithm value - @rtype: int""" - - value = _algorithm_by_text.get(text.upper()) - if value is None: - value = int(text) - return value - -def algorithm_to_text(value): - """Convert a DNSSEC algorithm value to text - @rtype: string""" - - text = _algorithm_by_value.get(value) - if text is None: - text = str(value) - return text - -def _to_rdata(record, origin): - s = io.BytesIO() - record.to_wire(s, origin=origin) - return s.getvalue() - -def key_id(key, origin=None): - rdata = _to_rdata(key, origin) - if key.algorithm == RSAMD5: - return (rdata[-3] << 8) + rdata[-2] - else: - total = 0 - for i in range(len(rdata) // 2): - total += (rdata[2 * i] << 8) + rdata[2 * i + 1] - if len(rdata) % 2 != 0: - total += rdata[len(rdata) - 1] << 8 - total += ((total >> 16) & 0xffff); - return total & 0xffff - -def make_ds(name, key, algorithm, origin=None): - if algorithm.upper() == 'SHA1': - dsalg = 1 - hash = dns.hash.get('SHA1')() - elif algorithm.upper() == 'SHA256': - dsalg = 2 - hash = dns.hash.get('SHA256')() - else: - raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) - - if isinstance(name, str): - name = dns.name.from_text(name, origin) - hash.update(name.canonicalize().to_wire()) - hash.update(_to_rdata(key, origin)) - digest = hash.digest() - - dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest - return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, - len(dsrdata)) - -def _find_candidate_keys(keys, rrsig): - candidate_keys=[] - value = keys.get(rrsig.signer) - if value is None: - return None - if isinstance(value, dns.node.Node): - try: - rdataset = value.find_rdataset(dns.rdataclass.IN, - dns.rdatatype.DNSKEY) - except KeyError: - return None - else: - rdataset = value - for rdata in rdataset: - if rdata.algorithm == rrsig.algorithm and \ - key_id(rdata) == rrsig.key_tag: - candidate_keys.append(rdata) - return candidate_keys - -def _is_rsa(algorithm): - return algorithm in (RSAMD5, RSASHA1, - RSASHA1NSEC3SHA1, RSASHA256, - RSASHA512) - -def _is_dsa(algorithm): - return algorithm in (DSA, DSANSEC3SHA1) - -def _is_ecdsa(algorithm): - return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384)) - -def _is_md5(algorithm): - return algorithm == RSAMD5 - -def _is_sha1(algorithm): - return algorithm in (DSA, RSASHA1, - DSANSEC3SHA1, RSASHA1NSEC3SHA1) - -def _is_sha256(algorithm): - return algorithm in (RSASHA256, ECDSAP256SHA256) - -def _is_sha384(algorithm): - return algorithm == ECDSAP384SHA384 - -def _is_sha512(algorithm): - return algorithm == RSASHA512 - -def _make_hash(algorithm): - if _is_md5(algorithm): - return dns.hash.get('MD5')() - if _is_sha1(algorithm): - return dns.hash.get('SHA1')() - if _is_sha256(algorithm): - return dns.hash.get('SHA256')() - if _is_sha384(algorithm): - return dns.hash.get('SHA384')() - if _is_sha512(algorithm): - return dns.hash.get('SHA512')() - raise ValidationFailure('unknown hash for algorithm %u' % algorithm) - -def _make_algorithm_id(algorithm): - if _is_md5(algorithm): - oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05] - elif _is_sha1(algorithm): - oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a] - elif _is_sha256(algorithm): - oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01] - elif _is_sha512(algorithm): - oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03] - else: - raise ValidationFailure('unknown algorithm %u' % algorithm) - olen = len(oid) - dlen = _make_hash(algorithm).digest_size - idbytes = [0x30] + [8 + olen + dlen] + \ - [0x30, olen + 4] + [0x06, olen] + oid + \ - [0x05, 0x00] + [0x04, dlen] - return bytes(idbytes) - -def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): - """Validate an RRset against a single signature rdata - - The owner name of the rrsig is assumed to be the same as the owner name - of the rrset. - - @param rrset: The RRset to validate - @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) - tuple - @param rrsig: The signature rdata - @type rrsig: dns.rrset.Rdata - @param keys: The key dictionary. - @type keys: a dictionary keyed by dns.name.Name with node or rdataset values - @param origin: The origin to use for relative names - @type origin: dns.name.Name or None - @param now: The time to use when validating the signatures. The default - is the current time. - @type now: int - """ - - if isinstance(origin, str): - origin = dns.name.from_text(origin, dns.name.root) - - for candidate_key in _find_candidate_keys(keys, rrsig): - if not candidate_key: - raise ValidationFailure('unknown key') - - # For convenience, allow the rrset to be specified as a (name, rdataset) - # tuple as well as a proper rrset - if isinstance(rrset, tuple): - rrname = rrset[0] - rdataset = rrset[1] - else: - rrname = rrset.name - rdataset = rrset - - if now is None: - now = time.time() - if rrsig.expiration < now: - raise ValidationFailure('expired') - if rrsig.inception > now: - raise ValidationFailure('not yet valid') - - hash = _make_hash(rrsig.algorithm) - - if _is_rsa(rrsig.algorithm): - keyptr = candidate_key.key - (count,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - if count == 0: - (count,) = struct.unpack('!H', keyptr[0:2]) - keyptr = keyptr[2:] - rsa_e = keyptr[0:count] - rsa_n = keyptr[count:] - keylen = len(rsa_n) * 8 - pubkey = Crypto.PublicKey.RSA.construct( - (Crypto.Util.number.bytes_to_long(rsa_n), - Crypto.Util.number.bytes_to_long(rsa_e))) - sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),) - elif _is_dsa(rrsig.algorithm): - keyptr = candidate_key.key - (t,) = struct.unpack('!B', keyptr[0:1]) - keyptr = keyptr[1:] - octets = 64 + t * 8 - dsa_q = keyptr[0:20] - keyptr = keyptr[20:] - dsa_p = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_g = keyptr[0:octets] - keyptr = keyptr[octets:] - dsa_y = keyptr[0:octets] - pubkey = Crypto.PublicKey.DSA.construct( - (Crypto.Util.number.bytes_to_long(dsa_y), - Crypto.Util.number.bytes_to_long(dsa_g), - Crypto.Util.number.bytes_to_long(dsa_p), - Crypto.Util.number.bytes_to_long(dsa_q))) - (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:]) - sig = (Crypto.Util.number.bytes_to_long(dsa_r), - Crypto.Util.number.bytes_to_long(dsa_s)) - elif _is_ecdsa(rrsig.algorithm): - if rrsig.algorithm == ECDSAP256SHA256: - curve = ecdsa.curves.NIST256p - key_len = 32 - digest_len = 32 - elif rrsig.algorithm == ECDSAP384SHA384: - curve = ecdsa.curves.NIST384p - key_len = 48 - digest_len = 48 - else: - # shouldn't happen - raise ValidationFailure('unknown ECDSA curve') - keyptr = candidate_key.key - x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len]) - y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2]) - assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y) - point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) - verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, - curve) - pubkey = ECKeyWrapper(verifying_key, key_len) - r = rrsig.signature[:key_len] - s = rrsig.signature[key_len:] - sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r), - Crypto.Util.number.bytes_to_long(s)) - else: - raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) - - hash.update(_to_rdata(rrsig, origin)[:18]) - hash.update(rrsig.signer.to_digestable(origin)) - - if rrsig.labels < len(rrname) - 1: - suffix = rrname.split(rrsig.labels + 1)[1] - rrname = dns.name.from_text('*', suffix) - rrnamebuf = rrname.to_digestable(origin) - rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, - rrsig.original_ttl) - rrlist = sorted(rdataset); - for rr in rrlist: - hash.update(rrnamebuf) - hash.update(rrfixed) - rrdata = rr.to_digestable(origin) - rrlen = struct.pack('!H', len(rrdata)) - hash.update(rrlen) - hash.update(rrdata) - - digest = hash.digest() - - if _is_rsa(rrsig.algorithm): - # PKCS1 algorithm identifier goop - digest = _make_algorithm_id(rrsig.algorithm) + digest - padlen = keylen // 8 - len(digest) - 3 - digest = bytes([0]) + bytes([1]) + bytes([0xFF]) * padlen + \ - bytes([0]) + digest - elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm): - pass - else: - # Raise here for code clarity; this won't actually ever happen - # since if the algorithm is really unknown we'd already have - # raised an exception above - raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) - - if pubkey.verify(digest, sig): - return - raise ValidationFailure('verify failure') - -def _validate(rrset, rrsigset, keys, origin=None, now=None): - """Validate an RRset - - @param rrset: The RRset to validate - @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) - tuple - @param rrsigset: The signature RRset - @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) - tuple - @param keys: The key dictionary. - @type keys: a dictionary keyed by dns.name.Name with node or rdataset values - @param origin: The origin to use for relative names - @type origin: dns.name.Name or None - @param now: The time to use when validating the signatures. The default - is the current time. - @type now: int - """ - - if isinstance(origin, str): - origin = dns.name.from_text(origin, dns.name.root) - - if isinstance(rrset, tuple): - rrname = rrset[0] - else: - rrname = rrset.name - - if isinstance(rrsigset, tuple): - rrsigname = rrsigset[0] - rrsigrdataset = rrsigset[1] - else: - rrsigname = rrsigset.name - rrsigrdataset = rrsigset - - rrname = rrname.choose_relativity(origin) - rrsigname = rrname.choose_relativity(origin) - if rrname != rrsigname: - raise ValidationFailure("owner names do not match") - - for rrsig in rrsigrdataset: - try: - _validate_rrsig(rrset, rrsig, keys, origin, now) - return - except ValidationFailure: - pass - raise ValidationFailure("no RRSIGs validated") - -def _need_pycrypto(*args, **kwargs): - raise NotImplementedError("DNSSEC validation requires pycrypto") - -try: - import Crypto.PublicKey.RSA - import Crypto.PublicKey.DSA - import Crypto.Util.number - validate = _validate - validate_rrsig = _validate_rrsig - _have_pycrypto = True -except ImportError: - validate = _need_pycrypto - validate_rrsig = _need_pycrypto - _have_pycrypto = False - -try: - import ecdsa - import ecdsa.ecdsa - import ecdsa.ellipticcurve - import ecdsa.keys - _have_ecdsa = True - - class ECKeyWrapper(object): - def __init__(self, key, key_len): - self.key = key - self.key_len = key_len - def verify(self, digest, sig): - diglong = Crypto.Util.number.bytes_to_long(digest) - return self.key.pubkey.verifies(diglong, sig) - -except ImportError: - _have_ecdsa = False |
