#!/usr/bin/env python3 # """\ Tiny ACME client to get TLS certificate by responding to DNS challenges. It uses Let's Encrypt servers by default. As the script requires access to your private ACME account key and dns server, please read through it. It's about 326 lines, 225 SLOC with no line exceeding 80cols. Usage: acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose] [--acme-directory=URL] [--ttl=SECONDS] (--account-key=PATH) (--csr=PATH) (--script=PATH) acme_dns_tiny.py -h|--help Options: --account-key=PATH path to your Let's Encrypt account private key --csr=PATH path to your certificate signing request --quiet suppress output except for errors --verbose show all debug information on stderr --acme-directory=URL where to make acme request, default is Let's Encrypt --contact=MAIL create/update mail contact information --ttl=SECONDS time before (re)try self check --script=PATH script to run to update the DNS server record Called script: Script is called with the following arguments: action zone challenge action can be "add" or "delete" zone in the form _acme-challenge.domain. domains are extracted from CSR challenge given by the ACME server, to be set as a TXT record Example: | path/to/ \\ | --account-key=./account.key \\ | --csr=./domain.csr \\ | --contact=contact@domain \\ | --ttl=300 \\ | --script=./update-dns-record > signed.crt Example Crontab Renewal (once per month): 0 0 1 * * path/to/acme_dns_tiny.py --account-key /path/to/account.key \ --csr /path/to/domain.csr > /path/to/signed.crt \ 2>> /var/log/acme_tiny.log Example of called script: #!/bin/bash case "$1" in add) ssh authoritative.dns.host nsupdate \ <<<$"server ::1\\nupdate add $2 60 TXT $3\\nsend";; delete) ssh authoritative.dns.host nsupdate \ <<<$"server ::1\\nupdate delete $2 TXT";; esac """ import subprocess import json import base64 import time import hashlib import re import logging import collections import contextlib import docopt import requests import dns import dns.resolver as resolver ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory' # consistent logging: global instance share accross all this module log = logging.getLogger('acme_dns_tiny') def b64(b): """Encodes string as base64 as specified in ACME RFC""" return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") def b64sha256(payload): return b64(hashlib.sha256(payload.encode('utf8')).digest()) def b64json(obj): return b64(json.dumps(obj).encode("utf8")) def openssl(subcommand, stdin=None): """Run openssl command line and raise IOError on non-zero return.""" binary = True if '-sign' in subcommand or 'DER' in subcommand else False return subprocess.run( ['openssl'] + subcommand, input=stdin.encode('utf8') if binary and stdin is not None else stdin, stdout=subprocess.PIPE, check=True, encoding=None if binary else 'utf8', ).stdout def dns_script(script=None, action=None, zone=None, content=None): log.info('Calling script: "%s %s %s %s"', script, action, zone, content) subprocess.run([script, action, zone, content], check=True) def extract_domains_from_csr(csr_path): log.info('Extract domains from CSR.') csr = openssl(['req', '-in', csr_path, '-noout', '-text']) san = re.findall( r'X509v3 Subject Alternative Name:\s+([^\r\n]+)\r?\n', csr, re.MULTILINE) domains = set(re.findall(r'Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)', csr) + re.findall(r'DNS:\s*(\S+)\s*(?:,|$)', san and san[0] or '')) if not domains: raise ValueError('No domain to validate in the provided CSR.') return domains def self_challenge_check(*, domain=None, ttl=None, challenge=None): log.info('Self testing %s with challenge "%s"', domain, challenge) for checknum in range(10): log.info('Sleep 1 TTL (%ss) before self check', ttl) time.sleep(ttl) log.debug('Self check number %s', checknum) try: responses = resolver.query(domain, rdtype='TXT').rrset except dns.exception.DNSException as e: log.debug(' - DNS error: %s: %s', type(e).__name__, e) continue responses = [x.to_text() for x in responses] log.debug(' - Found values: %s', responses) if f'"{challenge}"' in responses: break else: raise ValueError(f'Challenge not found: {challenge}') class ACME: def __init__(self, *, account_key_path=None): self.directory = None self.account_key_path = account_key_path self.dirheaders = headers = { 'User-Agent': 'acme-dns-tiny/2.1', 'Accept-Language': 'en', } self.headers = {**headers, 'Content-Type': 'application/jose+json'} self.jws_header = None self.jwk_thumbprint = None def generate_jws_header(self, account_key_path): log.debug('Read account key.') out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) pub_hex, pub_exp = re.search( r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)' r'\r?\npublicExponent: ([0-9]+)', out, re.MULTILINE | re.DOTALL).groups() self.jws_header = header = { 'alg': 'RS256', 'jwk': { 'e': b64(int(pub_exp).to_bytes(100, 'big').lstrip(b'\0')), 'kty': 'RSA', 'n': b64(bytes.fromhex(re.sub(r'(\s|:)', '', pub_hex))), }, 'nonce': None, } self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))) def init_sreqs(self, directory_url=None): self.generate_jws_header(self.account_key_path) log.info('Fetch information from the ACME directory.') self.directory = dirmap = requests.get(directory_url, headers=self.dirheaders).json() log.info('Request first nonce') self.jws_header['nonce'] = requests.get(dirmap["newNonce"] ).headers['Replay-Nonce'] def sreq(self, url=None, payload=None, headers=None): """Sends signed requests to ACME server.""" # POST-as-GET (payload=None != {}), payload64 is an empty string payload64 = '' if payload is None else b64json(payload) protected64 = b64json({**self.jws_header, 'url': url}) jose = { 'protected': protected64, 'payload': payload64, 'signature': b64(openssl(["dgst", "-sha256", "-sign", self.account_key_path], f'{protected64}.{payload64}')) } try: req = requests.post(url, json=jose, headers={**self.headers, **(headers or {})}) except requests.exceptions.RequestException as error: req = error.response self.jws_header['nonce'] = req.headers['Replay-Nonce'] jmap = {} with contextlib.suppress(ValueError): jmap = req.json() if req.status_code not in (200, 201): raise ValueError(f'Request failed: {req.status_code} {jmap}, ' 'hint: {sreq.headers.get("Link", "empty")}.') return collections.namedtuple('sreq', ['code', 'headers', 'map', 'text'])(req.status_code, req.headers, jmap, req.text) def register_account(self, *, contacts=None): log.info('Register/Login ACME Account.') srv_terms = self.directory.get('meta', {}).get('termsOfService', '') if srv_terms: log.warning('Terms of service auto agreed: %s', srv_terms) account_request = { **dict({'termsOfServiceAgreed': True} if srv_terms else {}), **dict({'contact': contacts} if contacts else {}), } sreq = self.sreq(self.directory['newAccount'], account_request) self.jws_header['kid'] = kid = sreq.headers['Location'] del self.jws_header['jwk'] if sreq.code == 201: log.info(' - Registered a new account: "%s"', kid) elif sreq.code == 200: log.debug(' - Account is already registered: "%s"', kid) sreq = self.sreq(self.jws_header['kid'], {}).map if contacts and (set(contacts) != set(sreq.map['contact'])): self.sreq(self.jws_header["kid"], account_request) log.info(' - Account updated with latest contact information.') def new_order(self, *, domains=None): log.info('Request a new ACME order.') sreq = self.sreq(self.directory['newOrder'], {'identifiers': [{'type': 'dns', 'value': x} for x in domains]}) # note: standard says only 201 is returned, assume 200 is not ok if sreq.code != 201 or sreq.map['status'] not in ('pending', 'ready'): raise ValueError(f'newOrder failed: {sreq.code} {sreq.map}') order_location = sreq.headers['Location'] log.debug(' - Order received: %s', order_location) if sreq.map['status'] == 'ready': log.info('No challenge to process: order is already ready.') sreq.map['authorizations'] = [] return sreq.map, order_location def validate_order(self, finalize_url, order_location, csr_der): log.info('Request to finalize the order (all chalenge completed)') self.sreq(finalize_url, {'csr': csr_der}) while True: sreq = self.sreq(order_location) if sreq.map['status'] == 'processing': time.sleep(sreq.headers.get('Retry-After', 5)) elif sreq.map['status'] == 'valid': log.info('Order finalized') break else: raise ValueError(f'Error finalizing order: {sreq.map}') return sreq.map def validate_challenge(self, *, url=None, keyauth=None): log.info('Asking ACME server to validate challenge') self.sreq(url, {'keyAuthorization': keyauth}) while True: sreq = self.sreq(url) if sreq.map['status'] == 'pending': time.sleep(sreq.headers.get('Retry-After', 5)) elif sreq.map['status'] == 'valid': log.info('ACME has verified challenge for domain.') break else: raise ValueError(f'Challenge for domain failed: {sreq.map}') def get_auth(self, authurl): log.info('Process challenge for authorization: %s', authurl) sreq = self.sreq(authurl).map domain = sreq['identifier']['value'] challenge = [c for c in sreq['challenges'] if c['type'] == 'dns-01'][0] token = re.sub(r'[^A-Za-z0-9_-]', '_', challenge['token']) # b64url keyauth = f'{token}.{self.jwk_thumbprint}' keydigest64 = b64sha256(keyauth) domain = f'_acme-challenge.{domain}.' return domain, keydigest64, challenge['url'], keyauth def get_certificate(self, crt_url, ): log.info('Signed certificate at: %s', crt_url) sreq = self.sreq(crt_url, headers={'Accept': 'application/pem-certificate-chain'}) log.info(' - Certificate links given by server: %s', sreq.headers.get('link', '')) return sreq.text def get_crt(args): ttl = int(args['--ttl'] or "60") domains = extract_domains_from_csr(args['--csr']) acme = ACME(account_key_path=args['--account-key']) acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY) acme.register_account(contacts=args['--contact']) order, order_location = acme.new_order(domains=domains) log.info('Completing each each authorization challenge') for authurl in order['authorizations']: domain, keydigest64, challenge_url, keyauth = acme.get_auth(authurl) dns_script(args['--script'], 'add', domain, keydigest64) self_challenge_check(domain=domain, ttl=ttl, challenge=keydigest64) try: acme.validate_challenge(url=challenge_url, keyauth=keyauth) finally: dns_script(args['--script'], 'delete', domain, '') csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER'])) order = acme.validate_order(order['finalize'], order_location, csr_der) return acme.get_certificate(order['certificate']) def main(args): args = args or docopt.docopt(__doc__) log.addHandler(logging.StreamHandler()) log.setLevel( (args['--verbose'] and logging.DEBUG) or (args['--quiet'] and logging.ERROR) or logging.INFO ) print(get_crt(args), end='') if __name__ == "__main__": main(None)