From 4338401912e3a415bf0a056c521cef612e29888c Mon Sep 17 00:00:00 2001 From: vg Date: Sun, 17 Jan 2021 21:25:43 +0100 Subject: add support for alternative chains --- acme_dns_tiny.py | 46 ++++--- tests/test_acme_dns_tiny.py | 315 ++++++++++++++++++++++++++++---------------- 2 files changed, 232 insertions(+), 129 deletions(-) diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py index 39e0be3..d89a46a 100644 --- a/acme_dns_tiny.py +++ b/acme_dns_tiny.py @@ -11,6 +11,7 @@ exceeding 80cols. Usage: acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose] [--acme-directory=URL] [--ttl=SECONDS] + [--separator=STR] (--account-key=PATH) (--csr=PATH) (--script=PATH) acme_dns_tiny.py -h|--help @@ -23,6 +24,7 @@ Options: --acme-directory=URL where to make acme request, default is Let's Encrypt --contact=MAIL create/update contact info (ex "mailto:x@example.com") --ttl=SECONDS time before (re)try self check + --separator=STR list all chains joined by STR if specified --script=PATH script to run to update the DNS server record Called script: @@ -80,6 +82,7 @@ ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory' # consistent logging: global instance share accross all this module log = logging.getLogger('acme_dns_tiny') +log_configure = True # useful for unit tests def b64(b): @@ -126,7 +129,7 @@ def self_challenge_check(*, domain=None, ttl=None, challenge=None): time.sleep(ttl) log.debug('Self check number %s', checknum) try: - responses = resolver.query(domain, rdtype='TXT').rrset + responses = resolver.resolve(domain, rdtype='TXT').rrset except dns.exception.DNSException as e: log.debug(' - DNS error: %s: %s', type(e).__name__, e) continue @@ -153,7 +156,7 @@ class ACME: def generate_jws_header(self, account_key_path): log.debug('Read account key.') - out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) + out = openssl(['pkey', '-in', account_key_path, '-noout', '-text']) pub_hex, pub_exp = re.search( r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)' r'\r?\npublicExponent: ([0-9]+)', @@ -278,13 +281,20 @@ class ACME: domain = f'_acme-challenge.{domain}.' return domain, keydigest64, challenge['url'], keyauth - def get_certificate(self, crt_url, ): + def get_chains(self, crt_url, separator=None): + headers = {'Accept': 'application/pem-certificate-chain'} log.info('Signed certificate at: %s', crt_url) - sreq = self.sreq(crt_url, - headers={'Accept': 'application/pem-certificate-chain'}) - log.info(' - Certificate links given by server: %s', - sreq.headers.get('link', '')) - return sreq.text + sreq = self.sreq(crt_url, headers=headers) + + if separator is None: + return sreq.text + + chains = [sreq.text] + regex = re.compile(r'\s*<(\S+)>\s*;\s*rel\s*=\s*"alternate"\s*') + for match in regex.finditer(sreq.headers.get('Link', '')): + chains.append(self.sreq(match.group(1), headers=headers).text) + + return separator.join(chains) def get_crt(args): @@ -307,17 +317,23 @@ def get_crt(args): csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER'])) order = acme.validate_order(order['finalize'], order_location, csr_der) - return acme.get_certificate(order['certificate']) + separator = None + if args['--separator'] is not None: + separator = args['--separator'].encode('utf8').decode('unicode_escape') + chains = acme.get_chains(order['certificate'], separator) + + return chains def main(args): args = args or docopt.docopt(__doc__) - log.addHandler(logging.StreamHandler()) - log.setLevel( - (args['--verbose'] and logging.DEBUG) or - (args['--quiet'] and logging.ERROR) or - logging.INFO - ) + if log_configure: + log.addHandler(logging.StreamHandler()) + log.setLevel( + (args['--verbose'] and logging.DEBUG) or + (args['--quiet'] and logging.ERROR) or + logging.INFO + ) print(get_crt(args), end='') diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py index bb96765..0d61b97 100644 --- a/tests/test_acme_dns_tiny.py +++ b/tests/test_acme_dns_tiny.py @@ -2,12 +2,14 @@ import collections +import contextlib import inspect import logging import os import subprocess import sys import tempfile +import time import pytest @@ -18,6 +20,9 @@ sys.path.append(os.path.realpath(".")) import acme_dns_tiny +acme_dns_tiny.log_configure = False + + ACME_STAGING_DIRECTORY = \ 'https://acme-staging-v02.api.letsencrypt.org/directory' DOMAIN = os.getenv('TEST_DOMAIN') @@ -25,149 +30,231 @@ CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain SCRIPT = os.getenv('TEST_SCRIPT') -def funcname(): - return inspect.currentframe().f_back.f_code.co_name - - -def parent_funcname(): - return inspect.currentframe().f_back.f_back.f_code.co_name - - -def assert_success(capsys, args): - name = parent_funcname() - logging.info(f'assert_success({name}, {args})') - acme_dns_tiny.main(args) - captured = capsys.readouterr() - #assert not captured.err - certlist = captured.out.split('-----BEGIN CERTIFICATE-----') - assert len(certlist) == 3 - assert certlist[0] == '' - assert '-----END CERTIFICATE-----\n' in certlist[1] - assert '-----END CERTIFICATE-----\n' in certlist[2] - textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], - stdin=captured.out) - assert 'Issuer' in textchain - - -def assert_assertion(args): - name = parent_funcname() - logging.info(f'assert_assertion({name}, {args})') - with pytest.raises(ValueError): - acme_dns_tiny.main(args) +# all possible key types and raise expectation (key considered too weak) +key_types = ( + ('rsa', 4096, False), + ('rsa', 2048, False), + ('rsa', 1024, True), # this key type is not recommended + ('ecdsa', 384, False), + ('ecdsa', 256, False), +) + + +@contextlib.contextmanager +def does_not_raise(): + yield + + +def _gen_rsa_key(path, size): + acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) + + +def _gen_ecdsa_key(path, size): + acme_dns_tiny.openssl([ + 'ecparam', + '-genkey', + '-name', f'secp{size}r1', + '-out', path, + ]) + + +#@pytest.fixture(scope='module') +def keys_generator(): + #tmpdir = tmpdir_factory.mktemp("data") + + def _gen_key(role, type_name, key_size): + name = f'{role}_{type_name}_{key_size}.key' + generator_map = {'rsa': _gen_rsa_key, 'ecdsa': _gen_ecdsa_key} + #path = str(tmpdir.join(name)) + def operate(tmpdir): + path = str(tmpdir.join(name)) + if not os.path.exists(path): + generator_map[type_name](path, key_size) + return path + return operate + #return { + # 'name': name, + # 'path': operate, + #} + + def _gen_keys_normal(type_account, type_domain): + account_key = _gen_key('account', type_account[0], type_account[1]) + domain_key = _gen_key('domain', type_domain[0], type_domain[1]) + + return { + 'account_key_path': account_key, + 'domain_key_path': domain_key, + 'raise_expected': type_account[2] or type_domain[2], + } + + def _normal_name(type1, type2): + return f'{type1[0]}{type1[1]}/{type2[0]}{type2[1]}' + + def _gen_keys_same(type_accountdomain): + keypath = _gen_key( + 'accountdomain', type_accountdomain[0], type_accountdomain[1]) + return { + 'account_key_path': keypath, + 'domain_key_path': keypath, + 'raise_expected': True, + } + + def _same_name(type1): + return f'{type1[0]}{type1[1]}/same' + + from itertools import chain, product + yield from chain( + (pytest.param(_gen_keys_normal(*x), id=_normal_name(*x)) + for x in product(key_types, key_types) + # TODO: as for now client does not support ec account keytype + # so skip it for now. + if 'ecdsa' not in x[0][0]), + (pytest.param(_gen_keys_same(x), id=_same_name(x)) + for x in key_types + # TODO: as for now client does not support ec account keytype + # so skip it for now. + if 'ecdsa' not in x[0]), + ) + + +@pytest.fixture(scope='module', params=keys_generator()) +def account_domain_key(request): + return request.param + + +@pytest.fixture(scope='module', params=[ + {'subj': f'/CN={DOMAIN}', 'san': None}, + {'subj': f'/CN=*.{DOMAIN}', 'san': None}, + {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, + {'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'}, + {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, +]) +def subj_fixture(request): + return request.param + + +CSR_ARGS_COUNTER = 0 + +@pytest.fixture(scope='module', params=[ + pytest.param(None, id='no_separator/no_contact'), + pytest.param({'args': {'--separator': '\\0'}}, id='separator'), + pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'), + pytest.param({'args': { + '--contact': [CONTACT], + '--separator': '\\0', + }}, id='separator+contact'), +]) +def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): + + global CSR_ARGS_COUNTER + CSR_ARGS_COUNTER += 1 -@pytest.fixture(scope='module') -def keys(tmpdir_factory): - tmpdir = tmpdir_factory.mktemp("data") - - def _gen_key(name, size): - path = str(tmpdir.join(name)) - acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) - return path - - return { - 'account_key': _gen_key('account_key', 4096), - 'domain_key': _gen_key('domain_key', 4096), - 'weak_account_key': _gen_key('weak_account_key', 1024), - 'weak_domain_key': _gen_key('weak_domain_key', 1024), - } - - -@pytest.fixture(scope='module') -def csr_args(tmpdir_factory, keys): tmpdir = tmpdir_factory.mktemp("data") + #keysdef = account_domain_key.values[0] + keysdef = account_domain_key + #print('debug', keysdef) default_args = { '--acme-directory': ACME_STAGING_DIRECTORY, '--script': SCRIPT, - '--account-key': keys['account_key'], + '--account-key': keysdef['account_key_path'](tmpdir), #'--ttl': 60, # already the default '--ttl': None, + '--separator': None, '--contact': None, '--verbose': None, '--quiet': None, } - def _csr(subj, key_name='domain_key', san=None, args={}): - name = parent_funcname() - domain_key = keys[key_name] - path = str(tmpdir.join(name)) - if san is None: - acme_dns_tiny.openssl([ - 'req', '-new', '-key', domain_key, '-subj', subj, '-out', path - ]) - else: - with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ - open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: - conf.write(orig.read()) - conf.write(f'\n[SAN]\nsubjectAltName={san}\n') - conf.flush() - acme_dns_tiny.openssl([ - 'req', '-new', '-key', domain_key, '-subj', subj, - '-reqexts', 'SAN', '-config', conf.name, '-out', path - ]) - return {**default_args, '--csr': path, **args} - - return _csr + if request.param: + default_args.update(request.param.get('args', {})) + + if default_args['--separator'] is not None: + assert default_args['--separator'] == '\\0' + assert default_args['--separator'].encode( + 'utf8').decode('unicode_escape') == '\0' + + name = f'{CSR_ARGS_COUNTER:02X}' + domain_key = keysdef['domain_key_path'](tmpdir) + path = str(tmpdir.join(name)) + with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ + open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: + conf.write(orig.read()) + conf.write(f'\n[SAN]\nsubjectAltName={subj_fixture["san"]}\n') + conf.flush() + openssl_san_args = [] + if subj_fixture['san']: + openssl_san_args = ['-reqexts', 'SAN', '-config', conf.name] + acme_dns_tiny.openssl([ + 'req', '-new', '-key', domain_key, + '-subj', subj_fixture['subj'], *openssl_san_args, + '-out', path + ]) + + return {**default_args, '--csr': path}, keysdef['raise_expected'] def test_sanity_env(): + 'check environment is correctly set before running other tests' assert bool(DOMAIN) assert bool(CONTACT) assert bool(SCRIPT) def test_sanity_command(): + 'check ssh access is correctly working before getting further' subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy']) -def test_success_single_domain_in_cn(csr_args, capsys): - subj = f'/CN={DOMAIN}' - assert_success(capsys, csr_args(subj)) - - -def test_success_wildcard_in_cn(csr_args, capsys): - subj = f'/CN=*.{DOMAIN}' - assert_success(capsys, csr_args(subj)) - - -def test_success_san_only(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' - assert_success(capsys, csr_args(subj=subj, san=san)) - - -def test_success_san_only_contact(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' - args = {'--contact': [CONTACT]} - assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - - -def test_success_san_contact(csr_args, capsys): - subj = f'/CN={DOMAIN}' - san = f'DNS:www.{DOMAIN}' - args = {'--contact': [CONTACT]} - assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - +def assert_cert(capsys, args): + captured = capsys.readouterr() + #assert not captured.err + #certlist = captured.out.split() -def test_success_wildcard_san(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:*.{DOMAIN}' - assert_success(capsys, csr_args(subj=subj, san=san)) + if args['--separator'] is None: + assert '\0' not in captured.out + else: + assert '\0' in captured.out + for chain in captured.out.split('\0'): + assert chain.count('-----BEGIN CERTIFICATE-----') == 2 + assert chain.count('-----END CERTIFICATE-----') == 2 -def test_fail_weak_account_key(csr_args, keys): - subj = f'/CN={DOMAIN}' - args = {'--account-key': keys['weak_account_key']} - assert_assertion(csr_args(subj, args=args)) + textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], + stdin=chain) + assert 'Issuer' in textchain + certtool_out = subprocess.check_output([ + 'certtool', '-e', '--verify-profile', 'low'], input=chain, + encoding='utf8') + assert ' Verified.' in certtool_out + assert certtool_out.count('Subject:') == 3 -def test_fail_weak_domain_key(csr_args): - subj = f'/CN={DOMAIN}' - assert_assertion(csr_args(subj, key_name='weak_domain_key')) +def module_main_caller(*, capsys, args, expectation): + logging.info(f'module_main_caller({args}, {expectation})') -def test_fail_csr_by_account(csr_args): - subj = f'/CN={DOMAIN}' - # single, csr wrongly signed by account key - assert_assertion(csr_args(subj, key_name='account_key')) + logging.debug('before call to acme_dns_tiny.main()') + with expectation: + acme_dns_tiny.main(args) + # check_cert is under the expectation context manager since if + # acme_dns_tiny.main() raises, following statement must not be run. + assert_cert(capsys, args) + logging.debug('after call to acme_dns_tiny.main()') + + +def test_main(main_args, capsys): + t_start = time.time() + args = main_args[0] + raise_expected = main_args[1] + #print('subj', subj, 'args', args) + expectation = does_not_raise() + if raise_expected: + expectation = pytest.raises(ValueError) + module_main_caller(capsys=capsys, args=args, expectation=expectation) + t_stop = time.time() + + # calculate for letsencrypt rate limit (50account per hour) + t_diff = 216 - (t_stop - t_start) + if t_diff > 0: + time.sleep(t_diff) -- cgit v1.2.3