From 4338401912e3a415bf0a056c521cef612e29888c Mon Sep 17 00:00:00 2001 From: vg Date: Sun, 17 Jan 2021 21:25:43 +0100 Subject: add support for alternative chains --- tests/test_acme_dns_tiny.py | 315 ++++++++++++++++++++++++++++---------------- 1 file changed, 201 insertions(+), 114 deletions(-) (limited to 'tests') diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py index bb96765..0d61b97 100644 --- a/tests/test_acme_dns_tiny.py +++ b/tests/test_acme_dns_tiny.py @@ -2,12 +2,14 @@ import collections +import contextlib import inspect import logging import os import subprocess import sys import tempfile +import time import pytest @@ -18,6 +20,9 @@ sys.path.append(os.path.realpath(".")) import acme_dns_tiny +acme_dns_tiny.log_configure = False + + ACME_STAGING_DIRECTORY = \ 'https://acme-staging-v02.api.letsencrypt.org/directory' DOMAIN = os.getenv('TEST_DOMAIN') @@ -25,149 +30,231 @@ CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain SCRIPT = os.getenv('TEST_SCRIPT') -def funcname(): - return inspect.currentframe().f_back.f_code.co_name - - -def parent_funcname(): - return inspect.currentframe().f_back.f_back.f_code.co_name - - -def assert_success(capsys, args): - name = parent_funcname() - logging.info(f'assert_success({name}, {args})') - acme_dns_tiny.main(args) - captured = capsys.readouterr() - #assert not captured.err - certlist = captured.out.split('-----BEGIN CERTIFICATE-----') - assert len(certlist) == 3 - assert certlist[0] == '' - assert '-----END CERTIFICATE-----\n' in certlist[1] - assert '-----END CERTIFICATE-----\n' in certlist[2] - textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], - stdin=captured.out) - assert 'Issuer' in textchain - - -def assert_assertion(args): - name = parent_funcname() - logging.info(f'assert_assertion({name}, {args})') - with pytest.raises(ValueError): - acme_dns_tiny.main(args) +# all possible key types and raise expectation (key considered too weak) +key_types = ( + ('rsa', 4096, False), + ('rsa', 2048, False), + ('rsa', 1024, True), # this key type is not recommended + ('ecdsa', 384, False), + ('ecdsa', 256, False), +) + + +@contextlib.contextmanager +def does_not_raise(): + yield + + +def _gen_rsa_key(path, size): + acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) + + +def _gen_ecdsa_key(path, size): + acme_dns_tiny.openssl([ + 'ecparam', + '-genkey', + '-name', f'secp{size}r1', + '-out', path, + ]) + + +#@pytest.fixture(scope='module') +def keys_generator(): + #tmpdir = tmpdir_factory.mktemp("data") + + def _gen_key(role, type_name, key_size): + name = f'{role}_{type_name}_{key_size}.key' + generator_map = {'rsa': _gen_rsa_key, 'ecdsa': _gen_ecdsa_key} + #path = str(tmpdir.join(name)) + def operate(tmpdir): + path = str(tmpdir.join(name)) + if not os.path.exists(path): + generator_map[type_name](path, key_size) + return path + return operate + #return { + # 'name': name, + # 'path': operate, + #} + + def _gen_keys_normal(type_account, type_domain): + account_key = _gen_key('account', type_account[0], type_account[1]) + domain_key = _gen_key('domain', type_domain[0], type_domain[1]) + + return { + 'account_key_path': account_key, + 'domain_key_path': domain_key, + 'raise_expected': type_account[2] or type_domain[2], + } + + def _normal_name(type1, type2): + return f'{type1[0]}{type1[1]}/{type2[0]}{type2[1]}' + + def _gen_keys_same(type_accountdomain): + keypath = _gen_key( + 'accountdomain', type_accountdomain[0], type_accountdomain[1]) + return { + 'account_key_path': keypath, + 'domain_key_path': keypath, + 'raise_expected': True, + } + + def _same_name(type1): + return f'{type1[0]}{type1[1]}/same' + + from itertools import chain, product + yield from chain( + (pytest.param(_gen_keys_normal(*x), id=_normal_name(*x)) + for x in product(key_types, key_types) + # TODO: as for now client does not support ec account keytype + # so skip it for now. + if 'ecdsa' not in x[0][0]), + (pytest.param(_gen_keys_same(x), id=_same_name(x)) + for x in key_types + # TODO: as for now client does not support ec account keytype + # so skip it for now. + if 'ecdsa' not in x[0]), + ) + + +@pytest.fixture(scope='module', params=keys_generator()) +def account_domain_key(request): + return request.param + + +@pytest.fixture(scope='module', params=[ + {'subj': f'/CN={DOMAIN}', 'san': None}, + {'subj': f'/CN=*.{DOMAIN}', 'san': None}, + {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, + {'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'}, + {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, +]) +def subj_fixture(request): + return request.param + + +CSR_ARGS_COUNTER = 0 + +@pytest.fixture(scope='module', params=[ + pytest.param(None, id='no_separator/no_contact'), + pytest.param({'args': {'--separator': '\\0'}}, id='separator'), + pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'), + pytest.param({'args': { + '--contact': [CONTACT], + '--separator': '\\0', + }}, id='separator+contact'), +]) +def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): + + global CSR_ARGS_COUNTER + CSR_ARGS_COUNTER += 1 -@pytest.fixture(scope='module') -def keys(tmpdir_factory): - tmpdir = tmpdir_factory.mktemp("data") - - def _gen_key(name, size): - path = str(tmpdir.join(name)) - acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) - return path - - return { - 'account_key': _gen_key('account_key', 4096), - 'domain_key': _gen_key('domain_key', 4096), - 'weak_account_key': _gen_key('weak_account_key', 1024), - 'weak_domain_key': _gen_key('weak_domain_key', 1024), - } - - -@pytest.fixture(scope='module') -def csr_args(tmpdir_factory, keys): tmpdir = tmpdir_factory.mktemp("data") + #keysdef = account_domain_key.values[0] + keysdef = account_domain_key + #print('debug', keysdef) default_args = { '--acme-directory': ACME_STAGING_DIRECTORY, '--script': SCRIPT, - '--account-key': keys['account_key'], + '--account-key': keysdef['account_key_path'](tmpdir), #'--ttl': 60, # already the default '--ttl': None, + '--separator': None, '--contact': None, '--verbose': None, '--quiet': None, } - def _csr(subj, key_name='domain_key', san=None, args={}): - name = parent_funcname() - domain_key = keys[key_name] - path = str(tmpdir.join(name)) - if san is None: - acme_dns_tiny.openssl([ - 'req', '-new', '-key', domain_key, '-subj', subj, '-out', path - ]) - else: - with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ - open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: - conf.write(orig.read()) - conf.write(f'\n[SAN]\nsubjectAltName={san}\n') - conf.flush() - acme_dns_tiny.openssl([ - 'req', '-new', '-key', domain_key, '-subj', subj, - '-reqexts', 'SAN', '-config', conf.name, '-out', path - ]) - return {**default_args, '--csr': path, **args} - - return _csr + if request.param: + default_args.update(request.param.get('args', {})) + + if default_args['--separator'] is not None: + assert default_args['--separator'] == '\\0' + assert default_args['--separator'].encode( + 'utf8').decode('unicode_escape') == '\0' + + name = f'{CSR_ARGS_COUNTER:02X}' + domain_key = keysdef['domain_key_path'](tmpdir) + path = str(tmpdir.join(name)) + with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ + open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: + conf.write(orig.read()) + conf.write(f'\n[SAN]\nsubjectAltName={subj_fixture["san"]}\n') + conf.flush() + openssl_san_args = [] + if subj_fixture['san']: + openssl_san_args = ['-reqexts', 'SAN', '-config', conf.name] + acme_dns_tiny.openssl([ + 'req', '-new', '-key', domain_key, + '-subj', subj_fixture['subj'], *openssl_san_args, + '-out', path + ]) + + return {**default_args, '--csr': path}, keysdef['raise_expected'] def test_sanity_env(): + 'check environment is correctly set before running other tests' assert bool(DOMAIN) assert bool(CONTACT) assert bool(SCRIPT) def test_sanity_command(): + 'check ssh access is correctly working before getting further' subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy']) -def test_success_single_domain_in_cn(csr_args, capsys): - subj = f'/CN={DOMAIN}' - assert_success(capsys, csr_args(subj)) - - -def test_success_wildcard_in_cn(csr_args, capsys): - subj = f'/CN=*.{DOMAIN}' - assert_success(capsys, csr_args(subj)) - - -def test_success_san_only(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' - assert_success(capsys, csr_args(subj=subj, san=san)) - - -def test_success_san_only_contact(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}' - args = {'--contact': [CONTACT]} - assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - - -def test_success_san_contact(csr_args, capsys): - subj = f'/CN={DOMAIN}' - san = f'DNS:www.{DOMAIN}' - args = {'--contact': [CONTACT]} - assert_success(capsys, csr_args(subj=subj, san=san, args=args)) - +def assert_cert(capsys, args): + captured = capsys.readouterr() + #assert not captured.err + #certlist = captured.out.split() -def test_success_wildcard_san(csr_args, capsys): - subj = '/' - san = f'DNS:{DOMAIN},DNS:*.{DOMAIN}' - assert_success(capsys, csr_args(subj=subj, san=san)) + if args['--separator'] is None: + assert '\0' not in captured.out + else: + assert '\0' in captured.out + for chain in captured.out.split('\0'): + assert chain.count('-----BEGIN CERTIFICATE-----') == 2 + assert chain.count('-----END CERTIFICATE-----') == 2 -def test_fail_weak_account_key(csr_args, keys): - subj = f'/CN={DOMAIN}' - args = {'--account-key': keys['weak_account_key']} - assert_assertion(csr_args(subj, args=args)) + textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], + stdin=chain) + assert 'Issuer' in textchain + certtool_out = subprocess.check_output([ + 'certtool', '-e', '--verify-profile', 'low'], input=chain, + encoding='utf8') + assert ' Verified.' in certtool_out + assert certtool_out.count('Subject:') == 3 -def test_fail_weak_domain_key(csr_args): - subj = f'/CN={DOMAIN}' - assert_assertion(csr_args(subj, key_name='weak_domain_key')) +def module_main_caller(*, capsys, args, expectation): + logging.info(f'module_main_caller({args}, {expectation})') -def test_fail_csr_by_account(csr_args): - subj = f'/CN={DOMAIN}' - # single, csr wrongly signed by account key - assert_assertion(csr_args(subj, key_name='account_key')) + logging.debug('before call to acme_dns_tiny.main()') + with expectation: + acme_dns_tiny.main(args) + # check_cert is under the expectation context manager since if + # acme_dns_tiny.main() raises, following statement must not be run. + assert_cert(capsys, args) + logging.debug('after call to acme_dns_tiny.main()') + + +def test_main(main_args, capsys): + t_start = time.time() + args = main_args[0] + raise_expected = main_args[1] + #print('subj', subj, 'args', args) + expectation = does_not_raise() + if raise_expected: + expectation = pytest.raises(ValueError) + module_main_caller(capsys=capsys, args=args, expectation=expectation) + t_stop = time.time() + + # calculate for letsencrypt rate limit (50account per hour) + t_diff = 216 - (t_stop - t_start) + if t_diff > 0: + time.sleep(t_diff) -- cgit v1.2.3