summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvg <vgm+dev@devys.org>2021-01-17 21:25:43 +0100
committervg <vgm+dev@devys.org>2021-01-17 21:25:43 +0100
commit4338401912e3a415bf0a056c521cef612e29888c (patch)
tree0aa77513fa854e0388c86ef9d1e860fd39b5a4f2
parent706cffaac7aba2a4e623b4d1971d7c93d666d599 (diff)
downloadacme-dns-tiny-4338401912e3a415bf0a056c521cef612e29888c.tar.gz
acme-dns-tiny-4338401912e3a415bf0a056c521cef612e29888c.tar.bz2
acme-dns-tiny-4338401912e3a415bf0a056c521cef612e29888c.zip
add support for alternative chains
-rw-r--r--acme_dns_tiny.py46
-rw-r--r--tests/test_acme_dns_tiny.py315
2 files changed, 232 insertions, 129 deletions
diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py
index 39e0be3..d89a46a 100644
--- a/acme_dns_tiny.py
+++ b/acme_dns_tiny.py
@@ -11,6 +11,7 @@ exceeding 80cols.
Usage:
acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose]
[--acme-directory=URL] [--ttl=SECONDS]
+ [--separator=STR]
(--account-key=PATH) (--csr=PATH) (--script=PATH)
acme_dns_tiny.py -h|--help
@@ -23,6 +24,7 @@ Options:
--acme-directory=URL where to make acme request, default is Let's Encrypt
--contact=MAIL create/update contact info (ex "mailto:x@example.com")
--ttl=SECONDS time before (re)try self check
+ --separator=STR list all chains joined by STR if specified
--script=PATH script to run to update the DNS server record
Called script:
@@ -80,6 +82,7 @@ ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory'
# consistent logging: global instance share accross all this module
log = logging.getLogger('acme_dns_tiny')
+log_configure = True # useful for unit tests
def b64(b):
@@ -126,7 +129,7 @@ def self_challenge_check(*, domain=None, ttl=None, challenge=None):
time.sleep(ttl)
log.debug('Self check number %s', checknum)
try:
- responses = resolver.query(domain, rdtype='TXT').rrset
+ responses = resolver.resolve(domain, rdtype='TXT').rrset
except dns.exception.DNSException as e:
log.debug(' - DNS error: %s: %s', type(e).__name__, e)
continue
@@ -153,7 +156,7 @@ class ACME:
def generate_jws_header(self, account_key_path):
log.debug('Read account key.')
- out = openssl(['rsa', '-in', account_key_path, '-noout', '-text'])
+ out = openssl(['pkey', '-in', account_key_path, '-noout', '-text'])
pub_hex, pub_exp = re.search(
r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)'
r'\r?\npublicExponent: ([0-9]+)',
@@ -278,13 +281,20 @@ class ACME:
domain = f'_acme-challenge.{domain}.'
return domain, keydigest64, challenge['url'], keyauth
- def get_certificate(self, crt_url, ):
+ def get_chains(self, crt_url, separator=None):
+ headers = {'Accept': 'application/pem-certificate-chain'}
log.info('Signed certificate at: %s', crt_url)
- sreq = self.sreq(crt_url,
- headers={'Accept': 'application/pem-certificate-chain'})
- log.info(' - Certificate links given by server: %s',
- sreq.headers.get('link', ''))
- return sreq.text
+ sreq = self.sreq(crt_url, headers=headers)
+
+ if separator is None:
+ return sreq.text
+
+ chains = [sreq.text]
+ regex = re.compile(r'\s*<(\S+)>\s*;\s*rel\s*=\s*"alternate"\s*')
+ for match in regex.finditer(sreq.headers.get('Link', '')):
+ chains.append(self.sreq(match.group(1), headers=headers).text)
+
+ return separator.join(chains)
def get_crt(args):
@@ -307,17 +317,23 @@ def get_crt(args):
csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER']))
order = acme.validate_order(order['finalize'], order_location, csr_der)
- return acme.get_certificate(order['certificate'])
+ separator = None
+ if args['--separator'] is not None:
+ separator = args['--separator'].encode('utf8').decode('unicode_escape')
+ chains = acme.get_chains(order['certificate'], separator)
+
+ return chains
def main(args):
args = args or docopt.docopt(__doc__)
- log.addHandler(logging.StreamHandler())
- log.setLevel(
- (args['--verbose'] and logging.DEBUG) or
- (args['--quiet'] and logging.ERROR) or
- logging.INFO
- )
+ if log_configure:
+ log.addHandler(logging.StreamHandler())
+ log.setLevel(
+ (args['--verbose'] and logging.DEBUG) or
+ (args['--quiet'] and logging.ERROR) or
+ logging.INFO
+ )
print(get_crt(args), end='')
diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py
index bb96765..0d61b97 100644
--- a/tests/test_acme_dns_tiny.py
+++ b/tests/test_acme_dns_tiny.py
@@ -2,12 +2,14 @@
import collections
+import contextlib
import inspect
import logging
import os
import subprocess
import sys
import tempfile
+import time
import pytest
@@ -18,6 +20,9 @@ sys.path.append(os.path.realpath("."))
import acme_dns_tiny
+acme_dns_tiny.log_configure = False
+
+
ACME_STAGING_DIRECTORY = \
'https://acme-staging-v02.api.letsencrypt.org/directory'
DOMAIN = os.getenv('TEST_DOMAIN')
@@ -25,149 +30,231 @@ CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain
SCRIPT = os.getenv('TEST_SCRIPT')
-def funcname():
- return inspect.currentframe().f_back.f_code.co_name
-
-
-def parent_funcname():
- return inspect.currentframe().f_back.f_back.f_code.co_name
-
-
-def assert_success(capsys, args):
- name = parent_funcname()
- logging.info(f'assert_success({name}, {args})')
- acme_dns_tiny.main(args)
- captured = capsys.readouterr()
- #assert not captured.err
- certlist = captured.out.split('-----BEGIN CERTIFICATE-----')
- assert len(certlist) == 3
- assert certlist[0] == ''
- assert '-----END CERTIFICATE-----\n' in certlist[1]
- assert '-----END CERTIFICATE-----\n' in certlist[2]
- textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'],
- stdin=captured.out)
- assert 'Issuer' in textchain
-
-
-def assert_assertion(args):
- name = parent_funcname()
- logging.info(f'assert_assertion({name}, {args})')
- with pytest.raises(ValueError):
- acme_dns_tiny.main(args)
+# all possible key types and raise expectation (key considered too weak)
+key_types = (
+ ('rsa', 4096, False),
+ ('rsa', 2048, False),
+ ('rsa', 1024, True), # this key type is not recommended
+ ('ecdsa', 384, False),
+ ('ecdsa', 256, False),
+)
+
+
+@contextlib.contextmanager
+def does_not_raise():
+ yield
+
+
+def _gen_rsa_key(path, size):
+ acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)])
+
+
+def _gen_ecdsa_key(path, size):
+ acme_dns_tiny.openssl([
+ 'ecparam',
+ '-genkey',
+ '-name', f'secp{size}r1',
+ '-out', path,
+ ])
+
+
+#@pytest.fixture(scope='module')
+def keys_generator():
+ #tmpdir = tmpdir_factory.mktemp("data")
+
+ def _gen_key(role, type_name, key_size):
+ name = f'{role}_{type_name}_{key_size}.key'
+ generator_map = {'rsa': _gen_rsa_key, 'ecdsa': _gen_ecdsa_key}
+ #path = str(tmpdir.join(name))
+ def operate(tmpdir):
+ path = str(tmpdir.join(name))
+ if not os.path.exists(path):
+ generator_map[type_name](path, key_size)
+ return path
+ return operate
+ #return {
+ # 'name': name,
+ # 'path': operate,
+ #}
+
+ def _gen_keys_normal(type_account, type_domain):
+ account_key = _gen_key('account', type_account[0], type_account[1])
+ domain_key = _gen_key('domain', type_domain[0], type_domain[1])
+
+ return {
+ 'account_key_path': account_key,
+ 'domain_key_path': domain_key,
+ 'raise_expected': type_account[2] or type_domain[2],
+ }
+
+ def _normal_name(type1, type2):
+ return f'{type1[0]}{type1[1]}/{type2[0]}{type2[1]}'
+
+ def _gen_keys_same(type_accountdomain):
+ keypath = _gen_key(
+ 'accountdomain', type_accountdomain[0], type_accountdomain[1])
+ return {
+ 'account_key_path': keypath,
+ 'domain_key_path': keypath,
+ 'raise_expected': True,
+ }
+
+ def _same_name(type1):
+ return f'{type1[0]}{type1[1]}/same'
+
+ from itertools import chain, product
+ yield from chain(
+ (pytest.param(_gen_keys_normal(*x), id=_normal_name(*x))
+ for x in product(key_types, key_types)
+ # TODO: as for now client does not support ec account keytype
+ # so skip it for now.
+ if 'ecdsa' not in x[0][0]),
+ (pytest.param(_gen_keys_same(x), id=_same_name(x))
+ for x in key_types
+ # TODO: as for now client does not support ec account keytype
+ # so skip it for now.
+ if 'ecdsa' not in x[0]),
+ )
+
+
+@pytest.fixture(scope='module', params=keys_generator())
+def account_domain_key(request):
+ return request.param
+
+
+@pytest.fixture(scope='module', params=[
+ {'subj': f'/CN={DOMAIN}', 'san': None},
+ {'subj': f'/CN=*.{DOMAIN}', 'san': None},
+ {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'},
+ {'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'},
+ {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'},
+])
+def subj_fixture(request):
+ return request.param
+
+
+CSR_ARGS_COUNTER = 0
+
+@pytest.fixture(scope='module', params=[
+ pytest.param(None, id='no_separator/no_contact'),
+ pytest.param({'args': {'--separator': '\\0'}}, id='separator'),
+ pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'),
+ pytest.param({'args': {
+ '--contact': [CONTACT],
+ '--separator': '\\0',
+ }}, id='separator+contact'),
+])
+def main_args(tmpdir_factory, account_domain_key, request, subj_fixture):
+
+ global CSR_ARGS_COUNTER
+ CSR_ARGS_COUNTER += 1
-@pytest.fixture(scope='module')
-def keys(tmpdir_factory):
- tmpdir = tmpdir_factory.mktemp("data")
-
- def _gen_key(name, size):
- path = str(tmpdir.join(name))
- acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)])
- return path
-
- return {
- 'account_key': _gen_key('account_key', 4096),
- 'domain_key': _gen_key('domain_key', 4096),
- 'weak_account_key': _gen_key('weak_account_key', 1024),
- 'weak_domain_key': _gen_key('weak_domain_key', 1024),
- }
-
-
-@pytest.fixture(scope='module')
-def csr_args(tmpdir_factory, keys):
tmpdir = tmpdir_factory.mktemp("data")
+ #keysdef = account_domain_key.values[0]
+ keysdef = account_domain_key
+ #print('debug', keysdef)
default_args = {
'--acme-directory': ACME_STAGING_DIRECTORY,
'--script': SCRIPT,
- '--account-key': keys['account_key'],
+ '--account-key': keysdef['account_key_path'](tmpdir),
#'--ttl': 60, # already the default
'--ttl': None,
+ '--separator': None,
'--contact': None,
'--verbose': None,
'--quiet': None,
}
- def _csr(subj, key_name='domain_key', san=None, args={}):
- name = parent_funcname()
- domain_key = keys[key_name]
- path = str(tmpdir.join(name))
- if san is None:
- acme_dns_tiny.openssl([
- 'req', '-new', '-key', domain_key, '-subj', subj, '-out', path
- ])
- else:
- with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \
- open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig:
- conf.write(orig.read())
- conf.write(f'\n[SAN]\nsubjectAltName={san}\n')
- conf.flush()
- acme_dns_tiny.openssl([
- 'req', '-new', '-key', domain_key, '-subj', subj,
- '-reqexts', 'SAN', '-config', conf.name, '-out', path
- ])
- return {**default_args, '--csr': path, **args}
-
- return _csr
+ if request.param:
+ default_args.update(request.param.get('args', {}))
+
+ if default_args['--separator'] is not None:
+ assert default_args['--separator'] == '\\0'
+ assert default_args['--separator'].encode(
+ 'utf8').decode('unicode_escape') == '\0'
+
+ name = f'{CSR_ARGS_COUNTER:02X}'
+ domain_key = keysdef['domain_key_path'](tmpdir)
+ path = str(tmpdir.join(name))
+ with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \
+ open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig:
+ conf.write(orig.read())
+ conf.write(f'\n[SAN]\nsubjectAltName={subj_fixture["san"]}\n')
+ conf.flush()
+ openssl_san_args = []
+ if subj_fixture['san']:
+ openssl_san_args = ['-reqexts', 'SAN', '-config', conf.name]
+ acme_dns_tiny.openssl([
+ 'req', '-new', '-key', domain_key,
+ '-subj', subj_fixture['subj'], *openssl_san_args,
+ '-out', path
+ ])
+
+ return {**default_args, '--csr': path}, keysdef['raise_expected']
def test_sanity_env():
+ 'check environment is correctly set before running other tests'
assert bool(DOMAIN)
assert bool(CONTACT)
assert bool(SCRIPT)
def test_sanity_command():
+ 'check ssh access is correctly working before getting further'
subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy'])
-def test_success_single_domain_in_cn(csr_args, capsys):
- subj = f'/CN={DOMAIN}'
- assert_success(capsys, csr_args(subj))
-
-
-def test_success_wildcard_in_cn(csr_args, capsys):
- subj = f'/CN=*.{DOMAIN}'
- assert_success(capsys, csr_args(subj))
-
-
-def test_success_san_only(csr_args, capsys):
- subj = '/'
- san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}'
- assert_success(capsys, csr_args(subj=subj, san=san))
-
-
-def test_success_san_only_contact(csr_args, capsys):
- subj = '/'
- san = f'DNS:{DOMAIN},DNS:www.{DOMAIN}'
- args = {'--contact': [CONTACT]}
- assert_success(capsys, csr_args(subj=subj, san=san, args=args))
-
-
-def test_success_san_contact(csr_args, capsys):
- subj = f'/CN={DOMAIN}'
- san = f'DNS:www.{DOMAIN}'
- args = {'--contact': [CONTACT]}
- assert_success(capsys, csr_args(subj=subj, san=san, args=args))
-
+def assert_cert(capsys, args):
+ captured = capsys.readouterr()
+ #assert not captured.err
+ #certlist = captured.out.split()
-def test_success_wildcard_san(csr_args, capsys):
- subj = '/'
- san = f'DNS:{DOMAIN},DNS:*.{DOMAIN}'
- assert_success(capsys, csr_args(subj=subj, san=san))
+ if args['--separator'] is None:
+ assert '\0' not in captured.out
+ else:
+ assert '\0' in captured.out
+ for chain in captured.out.split('\0'):
+ assert chain.count('-----BEGIN CERTIFICATE-----') == 2
+ assert chain.count('-----END CERTIFICATE-----') == 2
-def test_fail_weak_account_key(csr_args, keys):
- subj = f'/CN={DOMAIN}'
- args = {'--account-key': keys['weak_account_key']}
- assert_assertion(csr_args(subj, args=args))
+ textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'],
+ stdin=chain)
+ assert 'Issuer' in textchain
+ certtool_out = subprocess.check_output([
+ 'certtool', '-e', '--verify-profile', 'low'], input=chain,
+ encoding='utf8')
+ assert ' Verified.' in certtool_out
+ assert certtool_out.count('Subject:') == 3
-def test_fail_weak_domain_key(csr_args):
- subj = f'/CN={DOMAIN}'
- assert_assertion(csr_args(subj, key_name='weak_domain_key'))
+def module_main_caller(*, capsys, args, expectation):
+ logging.info(f'module_main_caller({args}, {expectation})')
-def test_fail_csr_by_account(csr_args):
- subj = f'/CN={DOMAIN}'
- # single, csr wrongly signed by account key
- assert_assertion(csr_args(subj, key_name='account_key'))
+ logging.debug('before call to acme_dns_tiny.main()')
+ with expectation:
+ acme_dns_tiny.main(args)
+ # check_cert is under the expectation context manager since if
+ # acme_dns_tiny.main() raises, following statement must not be run.
+ assert_cert(capsys, args)
+ logging.debug('after call to acme_dns_tiny.main()')
+
+
+def test_main(main_args, capsys):
+ t_start = time.time()
+ args = main_args[0]
+ raise_expected = main_args[1]
+ #print('subj', subj, 'args', args)
+ expectation = does_not_raise()
+ if raise_expected:
+ expectation = pytest.raises(ValueError)
+ module_main_caller(capsys=capsys, args=args, expectation=expectation)
+ t_stop = time.time()
+
+ # calculate for letsencrypt rate limit (50account per hour)
+ t_diff = 216 - (t_stop - t_start)
+ if t_diff > 0:
+ time.sleep(t_diff)