From 4338401912e3a415bf0a056c521cef612e29888c Mon Sep 17 00:00:00 2001 From: vg Date: Sun, 17 Jan 2021 21:25:43 +0100 Subject: add support for alternative chains --- acme_dns_tiny.py | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) (limited to 'acme_dns_tiny.py') diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py index 39e0be3..d89a46a 100644 --- a/acme_dns_tiny.py +++ b/acme_dns_tiny.py @@ -11,6 +11,7 @@ exceeding 80cols. Usage: acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose] [--acme-directory=URL] [--ttl=SECONDS] + [--separator=STR] (--account-key=PATH) (--csr=PATH) (--script=PATH) acme_dns_tiny.py -h|--help @@ -23,6 +24,7 @@ Options: --acme-directory=URL where to make acme request, default is Let's Encrypt --contact=MAIL create/update contact info (ex "mailto:x@example.com") --ttl=SECONDS time before (re)try self check + --separator=STR list all chains joined by STR if specified --script=PATH script to run to update the DNS server record Called script: @@ -80,6 +82,7 @@ ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory' # consistent logging: global instance share accross all this module log = logging.getLogger('acme_dns_tiny') +log_configure = True # useful for unit tests def b64(b): @@ -126,7 +129,7 @@ def self_challenge_check(*, domain=None, ttl=None, challenge=None): time.sleep(ttl) log.debug('Self check number %s', checknum) try: - responses = resolver.query(domain, rdtype='TXT').rrset + responses = resolver.resolve(domain, rdtype='TXT').rrset except dns.exception.DNSException as e: log.debug(' - DNS error: %s: %s', type(e).__name__, e) continue @@ -153,7 +156,7 @@ class ACME: def generate_jws_header(self, account_key_path): log.debug('Read account key.') - out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) + out = openssl(['pkey', '-in', account_key_path, '-noout', '-text']) pub_hex, pub_exp = re.search( r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)' r'\r?\npublicExponent: ([0-9]+)', @@ -278,13 +281,20 @@ class ACME: domain = f'_acme-challenge.{domain}.' return domain, keydigest64, challenge['url'], keyauth - def get_certificate(self, crt_url, ): + def get_chains(self, crt_url, separator=None): + headers = {'Accept': 'application/pem-certificate-chain'} log.info('Signed certificate at: %s', crt_url) - sreq = self.sreq(crt_url, - headers={'Accept': 'application/pem-certificate-chain'}) - log.info(' - Certificate links given by server: %s', - sreq.headers.get('link', '')) - return sreq.text + sreq = self.sreq(crt_url, headers=headers) + + if separator is None: + return sreq.text + + chains = [sreq.text] + regex = re.compile(r'\s*<(\S+)>\s*;\s*rel\s*=\s*"alternate"\s*') + for match in regex.finditer(sreq.headers.get('Link', '')): + chains.append(self.sreq(match.group(1), headers=headers).text) + + return separator.join(chains) def get_crt(args): @@ -307,17 +317,23 @@ def get_crt(args): csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER'])) order = acme.validate_order(order['finalize'], order_location, csr_der) - return acme.get_certificate(order['certificate']) + separator = None + if args['--separator'] is not None: + separator = args['--separator'].encode('utf8').decode('unicode_escape') + chains = acme.get_chains(order['certificate'], separator) + + return chains def main(args): args = args or docopt.docopt(__doc__) - log.addHandler(logging.StreamHandler()) - log.setLevel( - (args['--verbose'] and logging.DEBUG) or - (args['--quiet'] and logging.ERROR) or - logging.INFO - ) + if log_configure: + log.addHandler(logging.StreamHandler()) + log.setLevel( + (args['--verbose'] and logging.DEBUG) or + (args['--quiet'] and logging.ERROR) or + logging.INFO + ) print(get_crt(args), end='') -- cgit v1.2.3