diff options
| -rw-r--r-- | acme_dns_tiny.py | 46 | ||||
| -rw-r--r-- | tests/test_acme_dns_tiny.py | 315 | 
2 files changed, 232 insertions, 129 deletions
| diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py index 39e0be3..d89a46a 100644 --- a/acme_dns_tiny.py +++ b/acme_dns_tiny.py @@ -11,6 +11,7 @@ exceeding 80cols.  Usage:    acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose]                     [--acme-directory=URL] [--ttl=SECONDS] +                   [--separator=STR]                     (--account-key=PATH) (--csr=PATH) (--script=PATH)    acme_dns_tiny.py -h|--help @@ -23,6 +24,7 @@ Options:    --acme-directory=URL  where to make acme request, default is Let's Encrypt    --contact=MAIL        create/update contact info (ex "mailto:x@example.com")    --ttl=SECONDS         time before (re)try self check +  --separator=STR       list all chains joined by STR if specified    --script=PATH         script to run to update the DNS server record  Called script: @@ -80,6 +82,7 @@ ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory'  # consistent logging: global instance share accross all this module  log = logging.getLogger('acme_dns_tiny') +log_configure = True # useful for unit tests  def b64(b): @@ -126,7 +129,7 @@ def self_challenge_check(*, domain=None, ttl=None, challenge=None):          time.sleep(ttl)          log.debug('Self check number %s', checknum)          try: -            responses = resolver.query(domain, rdtype='TXT').rrset +            responses = resolver.resolve(domain, rdtype='TXT').rrset          except dns.exception.DNSException as e:              log.debug('  - DNS error: %s: %s', type(e).__name__, e)              continue @@ -153,7 +156,7 @@ class ACME:      def generate_jws_header(self, account_key_path):          log.debug('Read account key.') -        out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) +        out = openssl(['pkey', '-in', account_key_path, '-noout', '-text'])          pub_hex, pub_exp = re.search(              r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)'              r'\r?\npublicExponent: ([0-9]+)', @@ -278,13 +281,20 @@ class ACME:          domain = f'_acme-challenge.{domain}.'          return domain, keydigest64, challenge['url'], keyauth -    def get_certificate(self, crt_url, ): +    def get_chains(self, crt_url, separator=None): +        headers = {'Accept': 'application/pem-certificate-chain'}          log.info('Signed certificate at: %s', crt_url) -        sreq = self.sreq(crt_url, -                headers={'Accept': 'application/pem-certificate-chain'}) -        log.info('  - Certificate links given by server: %s', -                sreq.headers.get('link', '')) -        return sreq.text +        sreq = self.sreq(crt_url, headers=headers) + +        if separator is None: +            return sreq.text + +        chains = [sreq.text] +        regex = re.compile(r'\s*<(\S+)>\s*;\s*rel\s*=\s*"alternate"\s*') +        for match in regex.finditer(sreq.headers.get('Link', '')): +            chains.append(self.sreq(match.group(1), headers=headers).text) + +        return separator.join(chains)  def get_crt(args): @@ -307,17 +317,23 @@ def get_crt(args):      csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER']))      order = acme.validate_order(order['finalize'], order_location, csr_der) -    return acme.get_certificate(order['certificate']) +    separator = None +    if args['--separator'] is not None: +        separator = args['--separator'].encode('utf8').decode('unicode_escape') +    chains = acme.get_chains(order['certificate'], separator) + +    return chains  def main(args):      args = args or docopt.docopt(__doc__) -    log.addHandler(logging.StreamHandler()) -    log.setLevel( -            (args['--verbose'] and logging.DEBUG) or -            (args['--quiet']   and logging.ERROR) or -            logging.INFO -    ) +    if log_configure: +        log.addHandler(logging.StreamHandler()) +        log.setLevel( +                (args['--verbose'] and logging.DEBUG) or +                (args['--quiet']   and logging.ERROR) or +                logging.INFO +        )      print(get_crt(args), end='') diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py index bb96765..0d61b97 100644 --- a/tests/test_acme_dns_tiny.py +++ b/tests/test_acme_dns_tiny.py @@ -2,12 +2,14 @@  import collections +import contextlib  import inspect  import logging  import os  import subprocess  import sys  import tempfile +import time  import pytest @@ -18,6 +20,9 @@ sys.path.append(os.path.realpath("."))  import acme_dns_tiny +acme_dns_tiny.log_configure = False + +  ACME_STAGING_DIRECTORY = \          'https://acme-staging-v02.api.letsencrypt.org/directory'  DOMAIN  = os.getenv('TEST_DOMAIN') @@ -25,149 +30,231 @@ CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain  SCRIPT  = os.getenv('TEST_SCRIPT') -def funcname(): -    return inspect.currentframe().f_back.f_code.co_name - - -def parent_funcname(): -    return inspect.currentframe().f_back.f_back.f_code.co_name - - -def assert_success(capsys, args): -    name = parent_funcname() -    logging.info(f'assert_success({name}, {args})') -    acme_dns_tiny.main(args) -    captured = capsys.readouterr() -    #assert not captured.err -    certlist = captured.out.split('-----BEGIN CERTIFICATE-----') -    assert len(certlist) == 3 -    assert certlist[0] == '' -    assert '-----END CERTIFICATE-----\n' in certlist[1] -    assert '-----END CERTIFICATE-----\n' in certlist[2] -    textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], -            stdin=captured.out) -    assert 'Issuer' in textchain - - -def assert_assertion(args): -    name = parent_funcname() -    logging.info(f'assert_assertion({name}, {args})') -    with pytest.raises(ValueError): -        acme_dns_tiny.main(args) +# all possible key types and raise expectation (key considered too weak) +key_types = ( +        ('rsa', 4096, False), +        ('rsa', 2048, False), +        ('rsa', 1024, True), # this key type is not recommended +        ('ecdsa', 384, False), +        ('ecdsa', 256, False), +) + + +@contextlib.contextmanager +def does_not_raise(): +    yield + + +def _gen_rsa_key(path, size): +    acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) + + +def _gen_ecdsa_key(path, size): +    acme_dns_tiny.openssl([ +        'ecparam', +        '-genkey', +        '-name', f'secp{size}r1', +        '-out', path, +    ]) + + +#@pytest.fixture(scope='module') +def keys_generator(): +    #tmpdir = tmpdir_factory.mktemp("data") + +    def _gen_key(role, type_name, key_size): +        name = f'{role}_{type_name}_{key_size}.key' +        generator_map = {'rsa': _gen_rsa_key, 'ecdsa': _gen_ecdsa_key} +        #path = str(tmpdir.join(name)) +        def operate(tmpdir): +            path = str(tmpdir.join(name)) +            if not os.path.exists(path): +                generator_map[type_name](path, key_size) +            return path +        return operate +        #return { +        #        'name': name, +        #        'path': operate, +        #} + +    def _gen_keys_normal(type_account, type_domain): +        account_key = _gen_key('account', type_account[0], type_account[1]) +        domain_key = _gen_key('domain', type_domain[0], type_domain[1]) + +        return { +                'account_key_path': account_key, +                'domain_key_path':  domain_key, +                'raise_expected':   type_account[2] or type_domain[2], +        } + +    def _normal_name(type1, type2): +        return f'{type1[0]}{type1[1]}/{type2[0]}{type2[1]}' + +    def _gen_keys_same(type_accountdomain): +        keypath = _gen_key( +                'accountdomain', type_accountdomain[0], type_accountdomain[1]) +        return { +                'account_key_path':   keypath, +                'domain_key_path':    keypath, +                'raise_expected':     True, +        } + +    def _same_name(type1): +        return f'{type1[0]}{type1[1]}/same' + +    from itertools import chain, product +    yield from chain( +            (pytest.param(_gen_keys_normal(*x), id=_normal_name(*x)) +                for x in product(key_types, key_types) +                # TODO: as for now client does not support ec account keytype +                # so skip it for now. +                if 'ecdsa' not in x[0][0]), +            (pytest.param(_gen_keys_same(x), id=_same_name(x)) +                for x in key_types +                # TODO: as for now client does not support ec account keytype +                # so skip it for now. +                if 'ecdsa' not in x[0]), +    ) + + +@pytest.fixture(scope='module', params=keys_generator()) +def account_domain_key(request): +    return request.param + + +@pytest.fixture(scope='module', params=[ +    {'subj': f'/CN={DOMAIN}',   'san': None}, +    {'subj': f'/CN=*.{DOMAIN}', 'san': None}, +    {'subj': f'/',              'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, +    {'subj': f'/CN={DOMAIN}',   'san': f'DNS:www.{DOMAIN}'}, +    {'subj': f'/',              'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, +]) +def subj_fixture(request): +    return request.param + + +CSR_ARGS_COUNTER = 0 + +@pytest.fixture(scope='module', params=[ +    pytest.param(None, id='no_separator/no_contact'), +    pytest.param({'args': {'--separator': '\\0'}}, id='separator'), +    pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'), +    pytest.param({'args': { +        '--contact': [CONTACT], +        '--separator': '\\0', +        }}, id='separator+contact'), +]) +def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): + +    global CSR_ARGS_COUNTER +    CSR_ARGS_COUNTER += 1 -@pytest.fixture(scope='module') -def keys(tmpdir_factory): -    tmpdir = tmpdir_factory.mktemp("data") - -    def _gen_key(name, size): -        path = str(tmpdir.join(name)) -        acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) -        return path - -    return { -            'account_key':      _gen_key('account_key', 4096), -            'domain_key':       _gen_key('domain_key', 4096), -            'weak_account_key': _gen_key('weak_account_key', 1024), -            'weak_domain_key':  _gen_key('weak_domain_key', 1024), -    } - - -@pytest.fixture(scope='module') -def csr_args(tmpdir_factory, keys):      tmpdir = tmpdir_factory.mktemp("data") +    #keysdef = account_domain_key.values[0] +    keysdef = account_domain_key +    #print('debug', keysdef)      default_args = {              '--acme-directory': ACME_STAGING_DIRECTORY,              '--script': SCRIPT, -            '--account-key': keys['account_key'], +            '--account-key': keysdef['account_key_path'](tmpdir),              #'--ttl': 60, # already the default              '--ttl': None, +            '--separator': None,              '--contact': None,              '--verbose': None,              '--quiet': None,      } -    def _csr(subj, key_name='domain_key', san=None, args={}): -        name = parent_funcname() -        domain_key = keys[key_name] -        path = str(tmpdir.join(name)) -        if san is None: -            acme_dns_tiny.openssl([ -                'req', '-new', '-key', domain_key, '-subj', subj, '-out', path -            ]) -        else: -            with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ -                    open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: -                conf.write(orig.read()) -                conf.write(f'\n[SAN]\nsubjectAltName={san}\n') -                conf.flush() -                acme_dns_tiny.openssl([ -                    'req', '-new', '-key', domain_key, '-subj', subj, -                    '-reqexts', 'SAN', '-config', conf.name, '-out', path -                ]) -        return {**default_args, '--csr': path, **args} - -    return _csr +    if request.param: +        default_args.update(request.param.get('args', {})) + +    if default_args['--separator'] is not None: +        assert default_args['--separator'] == '\\0' +        assert default_args['--separator'].encode( +                'utf8').decode('unicode_escape') == '\0' + +    name = f'{CSR_ARGS_COUNTER:02X}' +    domain_key = keysdef['domain_key_path'](tmpdir) +    path = str(tmpdir.join(name)) +    with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ +            open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: +        conf.write(orig.read()) +        conf.write(f'\n[SAN]\nsubjectAltName={subj_fixture["san"]}\n') +        conf.flush() +        openssl_san_args = [] +        if subj_fixture['san']: +            openssl_san_args = ['-reqexts', 'SAN', '-config', conf.name] +        acme_dns_tiny.openssl([ +            'req', '-new', '-key', domain_key, +            '-subj', subj_fixture['subj'], *openssl_san_args, +            '-out', path +        ]) + +    return {**default_args, '--csr': path}, keysdef['raise_expected']  def test_sanity_env(): +    'check environment is correctly set before running other tests'      assert bool(DOMAIN)      assert bool(CONTACT)      assert bool(SCRIPT)  def test_sanity_command(): +    'check ssh access is correctly working before getting further'      subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy']) -def test_success_single_domain_in_cn(csr_args, capsys): -    subj = f'/CN={DOMAIN}' -    assert_success(capsys, csr_args(subj)) - - -def test_success_wildcard_in_cn(csr_args, capsys): -    subj = f'/CN=*.{DOMAIN}' -    assert_success(capsys, csr_args(subj)) - - -def test_success_san_only(csr_args, capsys): -    subj = '/' -    san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' -    assert_success(capsys, csr_args(subj=subj, san=san)) - - -def test_success_san_only_contact(csr_args, capsys): -    subj = '/' -    san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' -    args = {'--contact': [CONTACT]} -    assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - - -def test_success_san_contact(csr_args, capsys): -    subj = f'/CN={DOMAIN}' -    san = f'DNS:www.{DOMAIN}' -    args = {'--contact': [CONTACT]} -    assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - +def assert_cert(capsys, args): +    captured = capsys.readouterr() +    #assert not captured.err +    #certlist = captured.out.split() -def test_success_wildcard_san(csr_args, capsys): -    subj = '/' -    san = f'DNS:{DOMAIN},DNS:*.{DOMAIN}' -    assert_success(capsys, csr_args(subj=subj, san=san)) +    if args['--separator'] is None: +        assert '\0' not in captured.out +    else: +        assert '\0' in captured.out +    for chain in captured.out.split('\0'): +        assert chain.count('-----BEGIN CERTIFICATE-----') == 2 +        assert chain.count('-----END CERTIFICATE-----') == 2 -def test_fail_weak_account_key(csr_args, keys): -    subj = f'/CN={DOMAIN}' -    args = {'--account-key': keys['weak_account_key']} -    assert_assertion(csr_args(subj, args=args)) +        textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], +                stdin=chain) +        assert 'Issuer' in textchain +        certtool_out = subprocess.check_output([ +            'certtool', '-e', '--verify-profile', 'low'], input=chain, +            encoding='utf8') +        assert ' Verified.' in certtool_out +        assert certtool_out.count('Subject:') == 3 -def test_fail_weak_domain_key(csr_args): -    subj = f'/CN={DOMAIN}' -    assert_assertion(csr_args(subj, key_name='weak_domain_key')) +def module_main_caller(*, capsys, args, expectation): +    logging.info(f'module_main_caller({args}, {expectation})') -def test_fail_csr_by_account(csr_args): -    subj = f'/CN={DOMAIN}' -    # single, csr wrongly signed by account key -    assert_assertion(csr_args(subj, key_name='account_key')) +    logging.debug('before call to acme_dns_tiny.main()') +    with expectation: +        acme_dns_tiny.main(args) +        # check_cert is under the expectation context manager since if +        # acme_dns_tiny.main() raises, following statement must not be run. +        assert_cert(capsys, args) +    logging.debug('after call to acme_dns_tiny.main()') + + +def test_main(main_args, capsys): +    t_start = time.time() +    args = main_args[0] +    raise_expected = main_args[1] +    #print('subj', subj, 'args', args) +    expectation = does_not_raise() +    if raise_expected: +        expectation = pytest.raises(ValueError) +    module_main_caller(capsys=capsys, args=args, expectation=expectation) +    t_stop = time.time() + +    # calculate for letsencrypt rate limit (50account per hour) +    t_diff = 216 - (t_stop - t_start) +    if t_diff > 0: +        time.sleep(t_diff) | 
