From 89f066b81671df29772be31804af3c531f58cec1 Mon Sep 17 00:00:00 2001 From: vg Date: Tue, 21 May 2019 15:35:48 +0200 Subject: Initial commit --- acme_dns_tiny.py | 326 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 acme_dns_tiny.py (limited to 'acme_dns_tiny.py') diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py new file mode 100644 index 0000000..1e5c505 --- /dev/null +++ b/acme_dns_tiny.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +# +"""\ +Tiny ACME client to get TLS certificate by responding to DNS challenges. It +uses Let's Encrypt servers by default. + +As the script requires access to your private ACME account key and dns server, +please read through it. It's about 326 lines, 225 SLOC with no line +exceeding 80cols. + +Usage: + acme_tiny_dns.py [--contact=MAIL]... [--quiet] [--verbose] + [--acme-directory=URL] [--ttl=SECONDS] + (--account-key=PATH) (--csr=PATH) (--script=PATH) + acme_tiny_dns.py -h|--help + +Options: + + --account-key=PATH path to your Let's Encrypt account private key + --csr=PATH path to your certificate signing request + --quiet suppress output except for errors + --verbose show all debug information on stderr + --acme-directory=URL where to make acme request, default is Let's Encrypt + --contact=MAIL create/update mail contact information + --ttl=SECONDS time before (re)try self check + --script=PATH script to run to update the DNS server record + +Called script usage: + +Script is called with the following arguments: action zone challenge + + action can be "add" or "delete" + zone in the form _acme-challenge.domain. domains are extracted from CSR + challenge given by the ACME server, to be set as a TXT record + +Example: + +acme_tiny_dns.py \\ + --account-key=./account.key \\ + --csr=./domain.csr \\ + --contact=contact@domain \\ + --ttl=300 \\ + --script=./update-dns-record > signed.crt + +Example Crontab Renewal (once per month): + +0 0 1 * * acme_tiny_dns.py --account-key /path/to/account.key \ + --csr /path/to/domain.csr > /path/to/signed.crt \ + 2>> /var/log/acme_tiny.log + +Example of called script: + +#!/bin/bash +case "$1" in + add) ssh authoritative.dns.host nsupdate \ + <<<$"server ::1\\nupdate add $2 60 TXT $3\\nsend";; + delete) ssh authoritative.dns.host nsupdate \ + <<<$"server ::1\\nupdate delete $2 TXT";; +esac +""" + + +import subprocess +import json +import base64 +import time +import hashlib +import re +import logging +import collections +import contextlib + +import docopt +import requests +import dns +import dns.resolver as resolver + + +ACME_DEFAULT_DIRECTORY = \ + 'https://acme-staging-v02.api.letsencrypt.org/directory' + +# consistent logging: global instance share accross all this module +log = logging.getLogger('acme_dns_tiny') + + +def b64(b): + """Encodes string as base64 as specified in ACME RFC""" + return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") + +def b64sha256(payload): + return b64(hashlib.sha256(payload.encode('utf8')).digest()) + +def b64json(obj): + return b64(json.dumps(obj).encode("utf8")) + +def openssl(subcommand, stdin=None): + """Run openssl command line and raise IOError on non-zero return.""" + binary = True if '-sign' in subcommand or 'DER' in subcommand else False + return subprocess.run( + ['openssl'] + subcommand, + input=stdin.encode('utf8') if binary and stdin is not None else stdin, + stdout=subprocess.PIPE, + check=True, + encoding=None if binary else 'utf8', + ).stdout + +def dns_script(script=None, action=None, zone=None, content=None): + log.info('Calling script: "%s %s %s %s"', script, action, zone, content) + subprocess.run([script, action, zone, content], check=True) + +def extract_domains_from_csr(csr_path): + log.info('Extract domains from CSR.') + csr = openssl(['req', '-in', csr_path, '-noout', '-text']) + san = re.findall( + r'X509v3 Subject Alternative Name:\s+([^\r\n]+)\r?\n', + csr, re.MULTILINE) + domains = set(re.findall(r'Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)', csr) + + re.findall(r'DNS:\s*(\S+)\s*(?:,|$)', san and san[0] or '')) + if not domains: + raise ValueError('No domain to validate in the provided CSR.') + return domains + +def self_challenge_check(*, domain=None, ttl=None, challenge=None): + log.info('Self testing %s with challenge "%s"', domain, challenge) + for checknum in range(10): + log.info('Sleep 1 TTL (%ss) before self check', ttl) + time.sleep(ttl) + log.debug('Self check number %s', checknum) + try: + responses = resolver.query(domain, rdtype='TXT').rrset + except dns.exception.DNSException as e: + log.debug(' - DNS error: %s: %s', type(e).__name__, e) + continue + responses = [x.to_text() for x in responses] + log.debug(' - Found values: %s', responses) + if f'"{challenge}"' in responses: + break + else: + raise ValueError(f'Challenge not found: {challenge}') + + +class ACME: + + def __init__(self, *, account_key_path=None): + self.directory = None + self.account_key_path = account_key_path + self.dirheaders = headers = { + 'User-Agent': 'acme-dns-tiny/2.1', + 'Accept-Language': 'en', + } + self.headers = {**headers, 'Content-Type': 'application/jose+json'} + self.jws_header = None + self.jwk_thumbprint = None + + def generate_jws_header(self, account_key_path): + log.debug('Read account key.') + out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) + pub_hex, pub_exp = re.search( + r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)' + r'\r?\npublicExponent: ([0-9]+)', + out, re.MULTILINE | re.DOTALL).groups() + self.jws_header = header = { + 'alg': 'RS256', + 'jwk': { + 'e': b64(int(pub_exp).to_bytes(100, 'big').lstrip(b'\0')), + 'kty': 'RSA', + 'n': b64(bytes.fromhex(re.sub(r'(\s|:)', '', pub_hex))), + }, + 'nonce': None, + } + self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'], + sort_keys=True, separators=(',', ':'))) + + def init_sreqs(self, directory_url=None): + self.generate_jws_header(self.account_key_path) + log.info('Fetch information from the ACME directory.') + self.directory = dirmap = requests.get(directory_url, + headers=self.dirheaders).json() + log.info('Request first nonce') + self.jws_header['nonce'] = requests.get(dirmap["newNonce"] + ).headers['Replay-Nonce'] + + def sreq(self, url=None, payload=None, headers=None): + """Sends signed requests to ACME server.""" + # POST-as-GET (payload=None != {}), payload64 is an empty string + payload64 = '' if payload is None else b64json(payload) + protected64 = b64json({**self.jws_header, 'url': url}) + jose = { + 'protected': protected64, + 'payload': payload64, + 'signature': b64(openssl(["dgst", "-sha256", "-sign", + self.account_key_path], f'{protected64}.{payload64}')) + } + try: + req = requests.post(url, json=jose, + headers={**self.headers, **(headers or {})}) + except requests.exceptions.RequestException as error: + req = error.response + self.jws_header['nonce'] = req.headers['Replay-Nonce'] + jmap = {} + with contextlib.suppress(ValueError): + jmap = req.json() + if req.status_code not in (200, 201): + raise ValueError(f'Request failed: {req.status_code} {jmap}, ' + 'hint: {sreq.headers.get("Link", "empty")}.') + return collections.namedtuple('sreq', ['code', 'headers', 'map', + 'text'])(req.status_code, req.headers, jmap, req.text) + + def register_account(self, *, contacts=None): + log.info('Register/Login ACME Account.') + srv_terms = self.directory.get('meta', {}).get('termsOfService', '') + if srv_terms: + log.warning('Terms of service auto agreed: %s', srv_terms) + account_request = { + **dict({'termsOfServiceAgreed': True} if srv_terms else {}), + **dict({'contact': contacts} if contacts else {}), + } + sreq = self.sreq(self.directory['newAccount'], account_request) + self.jws_header['kid'] = kid = sreq.headers['Location'] + del self.jws_header['jwk'] + if sreq.code == 201: + log.info(' - Registered a new account: "%s"', kid) + elif sreq.code == 200: + log.debug(' - Account is already registered: "%s"', kid) + sreq = self.sreq(self.jws_header['kid'], {}).map + if contacts and (set(contacts) != set(sreq.map['contact'])): + self.sreq(self.jws_header["kid"], account_request) + log.info(' - Account updated with latest contact information.') + + def new_order(self, *, domains=None): + log.info('Request a new ACME order.') + sreq = self.sreq(self.directory['newOrder'], {'identifiers': + [{'type': 'dns', 'value': x} for x in domains]}) + # note: standard says only 201 is returned, assume 200 is not ok + if sreq.code != 201 or sreq.map['status'] not in ('pending', 'ready'): + raise ValueError(f'newOrder failed: {sreq.code} {sreq.map}') + order_location = sreq.headers['Location'] + log.debug(' - Order received: %s', order_location) + if sreq.map['status'] == 'ready': + log.info('No challenge to process: order is already ready.') + sreq.map['authorizations'] = [] + return sreq.map, order_location + + def validate_order(self, finalize_url, order_location, csr_der): + log.info('Request to finalize the order (all chalenge completed)') + self.sreq(finalize_url, {'csr': csr_der}) + while True: + sreq = self.sreq(order_location) + if sreq.map['status'] == 'processing': + time.sleep(sreq.headers.get('Retry-After', 5)) + elif sreq.map['status'] == 'valid': + log.info('Order finalized') + break + else: + raise ValueError(f'Error finalizing order: {sreq.map}') + return sreq.map + + def validate_challenge(self, *, url=None, keyauth=None): + log.info('Asking ACME server to validate challenge') + self.sreq(url, {'keyAuthorization': keyauth}) + while True: + sreq = self.sreq(url) + if sreq.map['status'] == 'pending': + time.sleep(sreq.headers.get('Retry-After', 5)) + elif sreq.map['status'] == 'valid': + log.info('ACME has verified challenge for domain.') + break + else: + raise ValueError(f'Challenge for domain failed: {sreq.map}') + + def get_auth(self, authurl): + log.info('Process challenge for authorization: %s', authurl) + sreq = self.sreq(authurl).map + domain = sreq['identifier']['value'] + challenge = [c for c in sreq['challenges'] if c['type'] == 'dns-01'][0] + token = re.sub(r'[^A-Za-z0-9_-]', '_', challenge['token']) # b64url + keyauth = f'{token}.{self.jwk_thumbprint}' + keydigest64 = b64sha256(keyauth) + domain = f'_acme-challenge.{domain}.' + return domain, keydigest64, challenge['url'], keyauth + + def get_certificate(self, crt_url, ): + log.info('Signed certificate at: %s', crt_url) + sreq = self.sreq(crt_url, + headers={'Accept': 'application/pem-certificate-chain'}) + log.info(' - Certificate links given by server: %s', + sreq.headers.get('link', '')) + return sreq.text + + +def get_crt(args): + ttl = args['--ttl'] or 60 + domains = extract_domains_from_csr(args['--csr']) + acme = ACME(account_key_path=args['--account-key']) + acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY) + acme.register_account(contacts=args['--contact']) + order, order_location = acme.new_order(domains=domains) + + log.info('Completing each each authorization challenge') + for authurl in order['authorizations']: + domain, keydigest64, challenge_url, keyauth = acme.get_auth(authurl) + dns_script(args['--script'], 'add', domain, keydigest64) + self_challenge_check(domain=domain, ttl=ttl, challenge=keydigest64) + try: + acme.validate_challenge(url=challenge_url, keyauth=keyauth) + finally: + dns_script(args['--script'], 'delete', domain, '') + + csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER'])) + order = acme.validate_order(order['finalize'], order_location, csr_der) + return acme.get_certificate(order['certificate']) + + +def main(args): + args = args or docopt.docopt(__doc__) + log.addHandler(logging.StreamHandler()) + log.setLevel( + (args['--verbose'] and logging.DEBUG) or + (args['--quiet'] and logging.ERROR) or + logging.INFO + ) + print(get_crt(args), end='') + + +if __name__ == "__main__": + main() -- cgit v1.2.3