#!python3 import collections import contextlib import inspect import logging import os import subprocess import sys import tempfile import time import pytest # append cwd to module importable paths (pytest-3 should be executed from main # directory). sys.path.append(os.path.realpath(".")) import acme_dns_tiny acme_dns_tiny.log_configure = False ACME_STAGING_DIRECTORY = \ 'https://acme-staging-v02.api.letsencrypt.org/directory' DOMAIN = os.getenv('TEST_DOMAIN') CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain SCRIPT = os.getenv('TEST_SCRIPT') # all possible key types and raise expectation (key considered too weak) key_types = ( ('rsa', 4096, False), ('rsa', 2048, False), ('rsa', 1024, True), # this key type is not recommended ('ecdsa', 384, False), ('ecdsa', 256, False), ) # the first case of success expectation, try also to test the anti-replay # nonce expiration retry mechanism by waiting for it to expire (more than # 5min). first_success_case_nonce_timeout_done = False original_sreq_method = acme_dns_tiny.ACME.sreq nonce_expiration_sreq_wrapper_counter_max = 3 nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max def nonce_expiration_sreq_wrapper(*args, **kwargs): global nonce_expiration_sreq_wrapper_counter nonce_expiration_sreq_wrapper_counter -= 1 if nonce_expiration_sreq_wrapper_counter <= 0: nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max wait = 30 logging.info('waiting %dmin for nonce expiration test', wait) for i in range(wait, 0, -1): logging.info('%d min left', i) time.sleep(60) acme_dns_tiny.ACME.sreq = original_sreq_method # reset original method return original_sreq_method(*args, **kwargs) @contextlib.contextmanager def does_not_raise(): yield def _gen_rsa_key(path, size): acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) def _gen_ecdsa_key(path, size): acme_dns_tiny.openssl([ 'ecparam', '-genkey', '-name', f'secp{size}r1', '-out', path, ]) #@pytest.fixture(scope='module') def keys_generator(): #tmpdir = tmpdir_factory.mktemp("data") def _gen_key(role, type_name, key_size): name = f'{role}_{type_name}_{key_size}.key' generator_map = {'rsa': _gen_rsa_key, 'ecdsa': _gen_ecdsa_key} #path = str(tmpdir.join(name)) def operate(tmpdir): path = str(tmpdir.join(name)) if not os.path.exists(path): generator_map[type_name](path, key_size) return path return operate #return { # 'name': name, # 'path': operate, #} def _gen_keys_normal(type_account, type_domain): account_key = _gen_key('account', type_account[0], type_account[1]) domain_key = _gen_key('domain', type_domain[0], type_domain[1]) return { 'account_key_path': account_key, 'domain_key_path': domain_key, 'raise_expected': type_account[2] or type_domain[2], } def _normal_name(type1, type2): return f'{type1[0]}{type1[1]}/{type2[0]}{type2[1]}' def _gen_keys_same(type_accountdomain): keypath = _gen_key( 'accountdomain', type_accountdomain[0], type_accountdomain[1]) return { 'account_key_path': keypath, 'domain_key_path': keypath, 'raise_expected': True, } def _same_name(type1): return f'{type1[0]}{type1[1]}/same' from itertools import chain, product yield from chain( (pytest.param(_gen_keys_normal(*x), id=_normal_name(*x)) for x in product(key_types, key_types) # TODO: as for now client does not support ec account keytype # so skip it for now. if 'ecdsa' not in x[0][0]), (pytest.param(_gen_keys_same(x), id=_same_name(x)) for x in key_types # TODO: as for now client does not support ec account keytype # so skip it for now. if 'ecdsa' not in x[0]), ) @pytest.fixture(scope='module', params=keys_generator()) def account_domain_key(request): return request.param @pytest.fixture(scope='module', params=[ {'subj': f'/CN={DOMAIN}', 'san': None}, {'subj': f'/CN=*.{DOMAIN}', 'san': None}, {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, {'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'}, {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, ]) def subj_fixture(request): return request.param CSR_ARGS_COUNTER = 0 @pytest.fixture(scope='module', params=[ pytest.param(None, id='no_separator/no_contact'), pytest.param({'args': {'--separator': '\\0'}}, id='separator'), pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'), pytest.param({'args': { '--contact': [CONTACT], '--separator': '\\0', }}, id='separator+contact'), ]) def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): global CSR_ARGS_COUNTER CSR_ARGS_COUNTER += 1 tmpdir = tmpdir_factory.mktemp("data") #keysdef = account_domain_key.values[0] keysdef = account_domain_key #print('debug', keysdef) default_args = { '--acme-directory': ACME_STAGING_DIRECTORY, '--script': SCRIPT, '--account-key': keysdef['account_key_path'](tmpdir), #'--ttl': 60, # already the default '--ttl': None, '--separator': None, '--contact': None, '--verbose': None, '--quiet': None, } if request.param: default_args.update(request.param.get('args', {})) if default_args['--separator'] is not None: assert default_args['--separator'] == '\\0' assert default_args['--separator'].encode( 'utf8').decode('unicode_escape') == '\0' name = f'{CSR_ARGS_COUNTER:02X}' domain_key = keysdef['domain_key_path'](tmpdir) path = str(tmpdir.join(name)) with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: conf.write(orig.read()) conf.write(f'\n[SAN]\nsubjectAltName={subj_fixture["san"]}\n') conf.flush() openssl_san_args = [] if subj_fixture['san']: openssl_san_args = ['-reqexts', 'SAN', '-config', conf.name] acme_dns_tiny.openssl([ 'req', '-new', '-key', domain_key, '-subj', subj_fixture['subj'], *openssl_san_args, '-out', path ]) return {**default_args, '--csr': path}, keysdef['raise_expected'] def test_sanity_env(): 'check environment is correctly set before running other tests' assert bool(DOMAIN) assert bool(CONTACT) assert bool(SCRIPT) def test_sanity_command(): 'check ssh access is correctly working before getting further' subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy']) def assert_cert(capsys, args): captured = capsys.readouterr() #assert not captured.err #certlist = captured.out.split() logging.debug('captured stdout %s', captured.out) logging.debug('captured stderr %s', captured.err) if args['--separator'] is None: assert '\0' not in captured.out else: assert '\0' in captured.out for chain in captured.out.split('\0'): assert chain.count('-----BEGIN CERTIFICATE-----') >= 2 assert chain.count('-----END CERTIFICATE-----') >= 2 textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], stdin=chain) assert 'Issuer' in textchain certtool_out = subprocess.check_output([ 'certtool', '-e', '--verify-profile', 'low'], input=chain, encoding='utf8') assert ' Verified.' in certtool_out assert certtool_out.count('Subject:') >= 3 def module_main_caller(*, capsys, args, expectation, do_expire_nonce): logging.info(f'module_main_caller({args}, {expectation})') logging.debug('before call to acme_dns_tiny.main()') with expectation: acme_dns_tiny.ACME.sreq = original_sreq_method if do_expire_nonce: logging.info('doing expire nonce test') first_success_case_nonce_timeout_done = True acme_dns_tiny.ACME.sreq = nonce_expiration_sreq_wrapper acme_dns_tiny.main(args) # check_cert is under the expectation context manager since if # acme_dns_tiny.main() raises, following statement must not be run. assert_cert(capsys, args) logging.debug('after call to acme_dns_tiny.main()') def test_main(main_args, capsys): t_start = time.time() args = main_args[0] raise_expected = main_args[1] do_expire_nonce = False #print('subj', subj, 'args', args) expectation = does_not_raise() if raise_expected: expectation = pytest.raises(ValueError) elif not first_success_case_nonce_timeout_done: do_expire_nonce = True module_main_caller(capsys=capsys, args=args, expectation=expectation, do_expire_nonce=do_expire_nonce) t_stop = time.time() # calculate for letsencrypt rate limit (50account per hour) t_diff = 216 - (t_stop - t_start) if t_diff > 0: time.sleep(t_diff)