Ticket #6797: twisted_names_trunk_CERT.2.diff

File twisted_names_trunk_CERT.2.diff, 16.8 KB (added by Aaron Spike, 7 years ago)

Version of patch with tests. Sorry about the last file (which was a copy of the dnskey tests).

  • twisted/internet/interfaces.py

     
    570570        """
    571571
    572572
     573    def lookupCertificate(name, timeout=None):
     574        """
     575        Perform a CERT record lookup.
     576
     577        @type name: C{str}
     578        @param name: DNS name to resolve.
     579
     580        @type timeout: Sequence of C{int}
     581        @param timeout: Number of seconds after which to reissue the query.
     582            When the last timeout expires, the query is considered failed.
     583
     584        @rtype: L{Deferred}
     585        @return: A L{Deferred} which fires with a three-tuple of lists of
     586            L{twisted.names.dns.RRHeader} instances.  The first element of the
     587            tuple gives answers.  The second element of the tuple gives
     588            authorities.  The third element of the tuple gives additional
     589            information.  The L{Deferred} may instead fail with one of the
     590            exceptions defined in L{twisted.names.error} or with
     591            C{NotImplementedError}.
     592        """
     593
     594
    573595    def lookupNamingAuthorityPointer(name, timeout=None):
    574596        """
    575597        Perform a NAPTR record lookup.
  • twisted/names/client.py

     
    725725
    726726
    727727
     728def lookupCertificate(name, timeout=None):
     729    return getResolver().lookupCertificate(name, timeout)
     730
     731
     732
    728733def lookupResponsibility(name, timeout=None):
    729734    return getResolver().lookupResponsibility(name, timeout)
    730735
  • twisted/names/common.py

     
    149149        return self._lookup(name, dns.IN, dns.SPF, timeout)
    150150
    151151
     152    def lookupCertificate(self, name, timeout=None):
     153        return self._lookup(name, dns.IN, dns.CERT, timeout)
     154
     155
    152156    def lookupResponsibility(self, name, timeout=None):
    153157        return self._lookup(name, dns.IN, dns.RP, timeout)
    154158
     
    240244    dns.MX:    'lookupMailExchange',
    241245    dns.TXT:   'lookupText',
    242246    dns.SPF:   'lookupSenderPolicy',
     247    dns.CERT:   'lookupCertificate',
    243248
    244249    dns.RP:    'lookupResponsibility',
    245250    dns.AFSDB: 'lookupAFSDatabase',
  • twisted/names/dns.py

     
    1414__all__ = [
    1515    'IEncodable', 'IRecord',
    1616
    17     'A', 'A6', 'AAAA', 'AFSDB', 'CNAME', 'DNAME', 'HINFO',
     17    'A', 'A6', 'AAAA', 'AFSDB', 'CERT', 'CNAME', 'DNAME', 'HINFO',
    1818    'MAILA', 'MAILB', 'MB', 'MD', 'MF', 'MG', 'MINFO', 'MR', 'MX',
    1919    'NAPTR', 'NS', 'NULL', 'OPT', 'PTR', 'RP', 'SOA', 'SPF', 'SRV', 'TXT',
    2020    'WKS',
     
    4444
    4545
    4646# System imports
    47 import struct, random, socket
     47import base64, struct, random, socket
    4848from itertools import chain
    4949
    5050from io import BytesIO
     
    116116AAAA = 28
    117117SRV = 33
    118118NAPTR = 35
     119CERT = 37
    119120A6 = 38
    120121DNAME = 39
    121122OPT = 41
     
    146147    AAAA: 'AAAA',
    147148    SRV: 'SRV',
    148149    NAPTR: 'NAPTR',
     150    CERT: 'CERT',
    149151    A6: 'A6',
    150152    DNAME: 'DNAME',
    151153    OPT: 'OPT',
     
    15551557
    15561558
    15571559
     1560def _base64Format(bytes):
     1561    """
     1562    Base64 encode C{bytes} without any line breaks
     1563
     1564    @param bytes: The bytestring to encode.
     1565    @type bytes: L{bytes}
     1566
     1567    @return: The formatted base64 bytestring.
     1568    """
     1569    return nativeString(base64.encodestring(bytes).replace(b'\n', b''))
     1570
     1571
     1572
    15581573@implementer(IEncodable, IRecord)
     1574class Record_CERT(tputil.FancyStrMixin, tputil.FancyEqMixin):
     1575    """
     1576    A Certificate record.
     1577
     1578    @see: U{http://tools.ietf.org/html/rfc4398}
     1579
     1580    @ivar TYPE: CERT type code constant C{37}.
     1581    @ivar fancybasename: See L{tputil.FancyStrMixin}
     1582    @ivar showAttributes: See L{tputil.FancyStrMixin}
     1583    @ivar compareAttributes: See L{tputil.FancyEqMixin}
     1584
     1585    @ivar certType: See L{__init__}
     1586    @ivar keyTag: See L{__init__}
     1587    @ivar algorithm: See L{__init__}
     1588    @ivar certOrCRL: See L{__init__}
     1589    @ivar ttl: See L{__init__}
     1590    """
     1591
     1592    TYPE = CERT
     1593
     1594    fancybasename = 'CERT'
     1595
     1596    showAttributes = (
     1597        'certType', 'keyTag', 'algorithm',
     1598        ('certOrCRL', _base64Format), 'ttl')
     1599
     1600    compareAttributes = (
     1601        'certType', 'keyTag', 'algorithm',
     1602        'certOrCRL', 'ttl')
     1603
     1604    _fmt = '!HHB'
     1605    _fmt_size = struct.calcsize(_fmt)
     1606
     1607    def __init__(self, certType=1, keyTag=0,
     1608                 algorithm=5, certOrCRL=b'', ttl=None):
     1609        """
     1610        @param certType: an L{int} representing the certificate type
     1611            used in this CERT record. The default value (C{1}) represents
     1612            X.509 as per PKIX. See
     1613            U{https://www.iana.org/assignments/cert-rr-types/cert-rr-types.xhtml}
     1614        @type certType: L{int}
     1615
     1616        @param keyTag: Key tag value as described in RRSIG.
     1617            The default value (C{0}) is recommende if the
     1618            algorithm field is 0. See
     1619            U{https://tools.ietf.org/html/rfc4034#appendix-B}
     1620        @type keyTag: L{int}
     1621
     1622        @param algorithm: an L{int} representing the algorithm number
     1623            used in this CERT. The default value (C{5}) represents
     1624            RSA/SHA-1. These values are the same as used for DNSSEC. See
     1625            U{https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.txt}
     1626        @type algorithm: L{int}
     1627
     1628        @param certOrCRL: a base64 encoded string representing the
     1629            stored certificate or CRL.
     1630        @type certOrCRL: L{bytes}
     1631
     1632        @param ttl: The time-to-live of this record. TTL can be
     1633            supplied as an L{int} representing a period in seconds or
     1634            a human readable string can be supplied which will be
     1635            parsed by L{str2time}. Default is C{None}.
     1636        @type ttl: L{int} or L{str} or C{None}.
     1637        """
     1638        self.certType = certType
     1639        self.keyTag = keyTag
     1640        self.algorithm = algorithm
     1641        self.certOrCRL = certOrCRL
     1642        self.ttl = str2time(ttl)
     1643
     1644
     1645    def encode(self, strio, compDict=None):
     1646        strio.write(
     1647            struct.pack(self._fmt, self.certType, self.keyTag, self.algorithm))
     1648        strio.write(self.certOrCRL)
     1649
     1650
     1651    def decode(self, strio, length=None):
     1652        hdr = readPrecisely(strio, self._fmt_size)
     1653        self.certType, self.keyTag, self.algorithm = struct.unpack(self._fmt, hdr)
     1654
     1655        length -= self._fmt_size
     1656        self.certOrCRL = readPrecisely(strio, length)
     1657
     1658
     1659    def __hash__(self):
     1660        """
     1661        A has allowing this L{Record_CERT} to be used as a L{dict}
     1662        key.
     1663
     1664        @return: A L{hash} og the values of
     1665            L{Record_CERT.compareAttributes} except C{ttl}.
     1666        """
     1667        return hash(tuple(getattr(self, k)
     1668                          for k in self.compareAttributes
     1669                          if k != 'ttl'))
     1670
     1671
     1672
     1673@implementer(IEncodable, IRecord)
    15591674class Record_AFSDB(tputil.FancyStrMixin, tputil.FancyEqMixin):
    15601675    """
    15611676    Map from a domain name to the name of an AFS cell database server.
  • twisted/names/test/test_client.py

     
    974974        d.addCallback(self.checkResult, dns.SPF)
    975975        return d
    976976
     977    def test_lookupCertificate(self):
     978        """
     979        See L{test_lookupAddress}
     980        """
     981        d = client.lookupCertificate(self.hostname)
     982        d.addCallback(self.checkResult, dns.CERT)
     983        return d
     984
    977985    def test_lookupResponsibility(self):
    978986        """
    979987        See L{test_lookupAddress}
  • twisted/names/test/test_dns.py

     
    3030    dns.Record_WKS, dns.Record_SRV, dns.Record_AFSDB, dns.Record_RP,
    3131    dns.Record_HINFO, dns.Record_MINFO, dns.Record_MX, dns.Record_TXT,
    3232    dns.Record_AAAA, dns.Record_A6, dns.Record_NAPTR, dns.UnknownRecord,
     33    dns.Record_CERT,
    3334    ]
    3435
    3536
     
    570571        self._recordRoundtripTest(dns.Record_TXT(b'foo', b'bar'))
    571572
    572573
     574    def test_CERT(self):
     575        """
     576        The byte stream written by L{dns.Record_CERT.encode} can be used by
     577        L{dns.Record_CERT.decode} to reconstruct the state of the original
     578        L{dns.Record_Cert} instance.
     579        """
     580        self._recordRoundtripTest(dns.Record_CERT(certOrCRL=b'foobar'))
    573581
     582
    574583MESSAGE_AUTHENTIC_DATA_BYTES = (
    575584    b'\x00\x00' # ID
    576585    b'\x00' #
     
    12911300            "<SPF data=['foo', 'bar'] ttl=15>")
    12921301
    12931302
     1303    def test_cert(self):
     1304        """
     1305        The repr of a L{dns.Record_CERT} instance includes the data and ttl
     1306        fields of the record.
     1307        """
     1308        self.assertEqual(
     1309            repr(dns.Record_CERT(certOrCRL=b'foobar', ttl=15)),
     1310            "<CERT certType=1 keyTag=0 algorithm=5 certOrCRL=Zm9vYmFy ttl=15>")
     1311
     1312
    12941313    def test_unknown(self):
    12951314        """
    12961315        The repr of a L{dns.UnknownRecord} instance includes the data and ttl
     
    18501869            dns.Record_SPF('foo', 'bar', ttl=100))
    18511870
    18521871
     1872    def test_cert(self):
     1873        """
     1874        L{dns.Record_CERT} instances compare equal if and only if
     1875        they have the same certType, keyTag, algorithm,
     1876        certOrCRL, and ttl.
     1877        """
     1878        self._equalityTest(
     1879            dns.Record_CERT(certType=1),
     1880            dns.Record_CERT(certType=1),
     1881            dns.Record_CERT(certType=2))
     1882
     1883        self._equalityTest(
     1884            dns.Record_CERT(keyTag=1),
     1885            dns.Record_CERT(keyTag=1),
     1886            dns.Record_CERT(keyTag=2))
     1887
     1888        self._equalityTest(
     1889            dns.Record_CERT(algorithm=1),
     1890            dns.Record_CERT(algorithm=1),
     1891            dns.Record_CERT(algorithm=2))
     1892
     1893        self._equalityTest(
     1894            dns.Record_CERT(certOrCRL=b'foo'),
     1895            dns.Record_CERT(certOrCRL=b'foo'),
     1896            dns.Record_CERT(certOrCRL=b'bar'))
     1897
     1898        self._equalityTest(
     1899            dns.Record_CERT(ttl=10),
     1900            dns.Record_CERT(ttl=10),
     1901            dns.Record_CERT(ttl=100))
     1902
     1903
     1904
    18531905    def test_unknown(self):
    18541906        """
    18551907        L{dns.UnknownRecord} instances compare equal if and only if they have
     
    26192671        o.decode(b)
    26202672        self.assertEqual(o.code, 1)
    26212673        self.assertEqual(o.data, b'foobar')
     2674
     2675
     2676class CERT_TEST_DATA(object):
     2677    """
     2678    Generate byte and instance representations of an
     2679    L{dns.Record_CERT} where all attributes are set to non-default
     2680    values.
     2681
     2682    For testing whether attributes have really been read from the byte
     2683    string during decoding.
     2684    """
     2685    @classmethod
     2686    def BYTES(cls):
     2687        """
     2688        @return: L{bytes} representing the encoded CERT record returned
     2689            by L{OBJECT}.
     2690        """
     2691        return (
     2692            b'\x00\x02' # certType
     2693            b'\x00\x01' # keyTag
     2694            b'\x04' # algorithm
     2695            b'foobar') # certOrCRL
     2696
     2697
     2698    @classmethod
     2699    def OBJECT(cls):
     2700        """
     2701        @return: A L{dns.Record_CERT} instance with attributes that
     2702            match the encoded record returned by L{BYTES}.
     2703        """
     2704        return dns.Record_CERT(
     2705            certType=2,
     2706            keyTag=1,
     2707            algorithm=4,
     2708            certOrCRL=b'foobar')
     2709
     2710
     2711
     2712class CERTRecordTests(unittest.TestCase):
     2713    """
     2714    Tests for L{dns.Record_CERT}.
     2715    """
     2716
     2717    def test_certTypeDefaultAttribute(self):
     2718        """
     2719        L{dns.Record_CERT.certType} is a public L{int} attribute
     2720        encoding the type of key the record holds. The
     2721        default value is C{1}.
     2722
     2723        http://tools.ietf.org/html/rfc4398#section-2.1
     2724        https://www.iana.org/assignments/cert-rr-types/cert-rr-types.xhtml
     2725        """
     2726        record = dns.Record_CERT()
     2727        self.assertEqual(record.certType, 1)
     2728
     2729
     2730    def test_certTypeOverride(self):
     2731        """
     2732        L{dns.Record_CERT.__init__} accepts a C{certType} parameter
     2733        which overrides the L{dns.Record_CERT.certType} attribute.
     2734        """
     2735        record = dns.Record_CERT(certType=2)
     2736        self.assertEqual(record.certType, 2)
     2737
     2738
     2739    def test_keyTagDefaultAttribute(self):
     2740        """
     2741        L{dns.Record_CERT.keyTag} is a public L{int}
     2742        attribute holding an identifier generated from the certificate.
     2743        The default is C{0}.
     2744        """
     2745        record = dns.Record_CERT()
     2746        self.assertEqual(record.keyTag, 0)
     2747
     2748
     2749    def test_keyTagOverride(self):
     2750        """
     2751        L{dns.Record_CERT.__init__} accepts a C{keyTag}
     2752        parameter which overrides the
     2753        L{dns.Record_CERT.keyTag} attribute.
     2754        """
     2755        record = dns.Record_CERT(keyTag=1)
     2756        self.assertEqual(record.keyTag, 1)
     2757
     2758
     2759    def test_algorithmDefaultAttribute(self):
     2760        """
     2761        L{dns.Record_CERT.algorithm} is a public L{int} attribute
     2762        whose default value is 5 (RSA/SHA-1).
     2763
     2764        Values are defined in DNSSEC
     2765        https://tools.ietf.org/html/rfc4034#section-2.1.3
     2766        """
     2767        record = dns.Record_CERT()
     2768        self.assertEqual(record.algorithm, 5)
     2769
     2770
     2771    def test_algorithmOverride(self):
     2772        """
     2773        L{dns.Record_CERT.__init__} accepts a C{algorithm}
     2774        parameter which overrides the
     2775        L{dns.Record_CERT.algorithm} attribute.
     2776        """
     2777        record = dns.Record_CERT(algorithm=255)
     2778        self.assertEqual(record.algorithm, 255)
     2779
     2780
     2781    def test_certOrCRLDefaultAttribute(self):
     2782        """
     2783        L{dns.Record_CERT.certOrCRL} is a public L{bytes} attribute
     2784        whose default value is C{b''}.
     2785        """
     2786        record = dns.Record_CERT()
     2787        self.assertEqual(record.certOrCRL, b'')
     2788
     2789
     2790    def test_certOrCRLOverride(self):
     2791        """
     2792        L{dns.Record_CERT.__init__} accepts a C{certOrCRL}
     2793        parameter which overrides the
     2794        L{dns.Record_CERT.certOrCRL} attribute.
     2795        """
     2796        record = dns.Record_CERT(certOrCRL=b'foobar')
     2797        self.assertEqual(record.certOrCRL, b'foobar')
     2798
     2799
     2800    def test_encode(self):
     2801        """
     2802        L{dns.Record_CERT.encode} packs the header fields and the
     2803        key and writes them to a file like object passed in as an
     2804        argument.
     2805        """
     2806        record = CERT_TEST_DATA.OBJECT()
     2807        actualBytes = BytesIO()
     2808        record.encode(actualBytes)
     2809
     2810        self.assertEqual(actualBytes.getvalue(), CERT_TEST_DATA.BYTES())
     2811
     2812
     2813    def test_decode(self):
     2814        """
     2815        L{dns.Record_CERT.decode} unpacks the header fields from a file
     2816        like object and populates the attributes of an existing
     2817        L{dns.Record_CERT} instance.
     2818        """
     2819        expectedBytes = CERT_TEST_DATA.BYTES()
     2820        record = dns.Record_CERT()
     2821        record.decode(BytesIO(expectedBytes), length=len(expectedBytes))
     2822
     2823        self.assertEqual(record, CERT_TEST_DATA.OBJECT())
     2824
     2825
     2826    def test_decodeShorterThanHeader(self):
     2827        """
     2828        L{dns.Record_CERT.decode} raises L{EOFError} if the provided
     2829        file object is shorter than the fixed length header parts. ie
     2830        everything except key.
     2831        """
     2832        record = dns.Record_CERT()
     2833
     2834        self.assertRaises(EOFError, record.decode, BytesIO(b'x'), length=1)
     2835
     2836
     2837    def test_decodeShorterThanKey(self):
     2838        """
     2839        L{dns.Record_CERT.decode} raises L{EOFError} if the provided
     2840        file object is shorter than length provided in the length
     2841        argument.
     2842        """
     2843        expectedBytes = CERT_TEST_DATA.BYTES()
     2844        record = dns.Record_CERT()
     2845
     2846        self.assertRaises(
     2847                EOFError,
     2848                record.decode,
     2849                BytesIO(expectedBytes[:-1]), length=len(expectedBytes))
  • twisted/names/test/test_names.py

     
    7474            dns.Record_NS('39.28.189.39'),
    7575            dns.Record_SPF('v=spf1 mx/30 mx:example.org/30 -all'),
    7676            dns.Record_SPF('v=spf1 +mx a:\0colo', '.example.com/28 -all not valid'),
     77            dns.Record_CERT(certOrCRL=b'\x01\x02'),
    7778            dns.Record_MX(10, 'host.test-domain.com'),
    7879            dns.Record_HINFO(os='Linux', cpu='A Fast One, Dontcha know'),
    7980            dns.Record_CNAME('canonical.name.com'),
     
    361362        )
    362363
    363364
     365    def testCERT(self):
     366        """
     367        L{DNSServerFactory} can serve I{CERT} resource records.
     368        """
     369        return self.namesTest(
     370            self.resolver.lookupCertificate('test-domain.com'),
     371            [dns.Record_CERT(certOrCRL=b'\x01\x02', ttl=19283784)]
     372        )
     373
    364374    def testWKS(self):
    365375        """Test DNS 'WKS' record queries"""
    366376        return self.namesTest(