From 89f066b81671df29772be31804af3c531f58cec1 Mon Sep 17 00:00:00 2001 From: vg Date: Tue, 21 May 2019 15:35:48 +0200 Subject: Initial commit --- .gitignore | 8 + acme_dns_tiny.py | 326 ++++++++++++++++++++++++++++++ doc/ssh_api_example/update-acme-challenge | 59 ++++++ license.txt | 23 +++ tests/test_acme_dns_tiny.py | 132 ++++++++++++ tests/test_script.sh | 9 + 6 files changed, 557 insertions(+) create mode 100644 .gitignore create mode 100644 acme_dns_tiny.py create mode 100755 doc/ssh_api_example/update-acme-challenge create mode 100644 license.txt create mode 100644 tests/test_acme_dns_tiny.py create mode 100755 tests/test_script.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d74053 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +*.log +__pycache__ +*.pyc +build +*.build +cache +*.cache +*.retry diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py new file mode 100644 index 0000000..1e5c505 --- /dev/null +++ b/acme_dns_tiny.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +# +"""\ +Tiny ACME client to get TLS certificate by responding to DNS challenges. It +uses Let's Encrypt servers by default. + +As the script requires access to your private ACME account key and dns server, +please read through it. It's about 326 lines, 225 SLOC with no line +exceeding 80cols. + +Usage: + acme_tiny_dns.py [--contact=MAIL]... [--quiet] [--verbose] + [--acme-directory=URL] [--ttl=SECONDS] + (--account-key=PATH) (--csr=PATH) (--script=PATH) + acme_tiny_dns.py -h|--help + +Options: + + --account-key=PATH path to your Let's Encrypt account private key + --csr=PATH path to your certificate signing request + --quiet suppress output except for errors + --verbose show all debug information on stderr + --acme-directory=URL where to make acme request, default is Let's Encrypt + --contact=MAIL create/update mail contact information + --ttl=SECONDS time before (re)try self check + --script=PATH script to run to update the DNS server record + +Called script usage: + +Script is called with the following arguments: action zone challenge + + action can be "add" or "delete" + zone in the form _acme-challenge.domain. domains are extracted from CSR + challenge given by the ACME server, to be set as a TXT record + +Example: + +acme_tiny_dns.py \\ + --account-key=./account.key \\ + --csr=./domain.csr \\ + --contact=contact@domain \\ + --ttl=300 \\ + --script=./update-dns-record > signed.crt + +Example Crontab Renewal (once per month): + +0 0 1 * * acme_tiny_dns.py --account-key /path/to/account.key \ + --csr /path/to/domain.csr > /path/to/signed.crt \ + 2>> /var/log/acme_tiny.log + +Example of called script: + +#!/bin/bash +case "$1" in + add) ssh authoritative.dns.host nsupdate \ + <<<$"server ::1\\nupdate add $2 60 TXT $3\\nsend";; + delete) ssh authoritative.dns.host nsupdate \ + <<<$"server ::1\\nupdate delete $2 TXT";; +esac +""" + + +import subprocess +import json +import base64 +import time +import hashlib +import re +import logging +import collections +import contextlib + +import docopt +import requests +import dns +import dns.resolver as resolver + + +ACME_DEFAULT_DIRECTORY = \ + 'https://acme-staging-v02.api.letsencrypt.org/directory' + +# consistent logging: global instance share accross all this module +log = logging.getLogger('acme_dns_tiny') + + +def b64(b): + """Encodes string as base64 as specified in ACME RFC""" + return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=") + +def b64sha256(payload): + return b64(hashlib.sha256(payload.encode('utf8')).digest()) + +def b64json(obj): + return b64(json.dumps(obj).encode("utf8")) + +def openssl(subcommand, stdin=None): + """Run openssl command line and raise IOError on non-zero return.""" + binary = True if '-sign' in subcommand or 'DER' in subcommand else False + return subprocess.run( + ['openssl'] + subcommand, + input=stdin.encode('utf8') if binary and stdin is not None else stdin, + stdout=subprocess.PIPE, + check=True, + encoding=None if binary else 'utf8', + ).stdout + +def dns_script(script=None, action=None, zone=None, content=None): + log.info('Calling script: "%s %s %s %s"', script, action, zone, content) + subprocess.run([script, action, zone, content], check=True) + +def extract_domains_from_csr(csr_path): + log.info('Extract domains from CSR.') + csr = openssl(['req', '-in', csr_path, '-noout', '-text']) + san = re.findall( + r'X509v3 Subject Alternative Name:\s+([^\r\n]+)\r?\n', + csr, re.MULTILINE) + domains = set(re.findall(r'Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)', csr) + + re.findall(r'DNS:\s*(\S+)\s*(?:,|$)', san and san[0] or '')) + if not domains: + raise ValueError('No domain to validate in the provided CSR.') + return domains + +def self_challenge_check(*, domain=None, ttl=None, challenge=None): + log.info('Self testing %s with challenge "%s"', domain, challenge) + for checknum in range(10): + log.info('Sleep 1 TTL (%ss) before self check', ttl) + time.sleep(ttl) + log.debug('Self check number %s', checknum) + try: + responses = resolver.query(domain, rdtype='TXT').rrset + except dns.exception.DNSException as e: + log.debug(' - DNS error: %s: %s', type(e).__name__, e) + continue + responses = [x.to_text() for x in responses] + log.debug(' - Found values: %s', responses) + if f'"{challenge}"' in responses: + break + else: + raise ValueError(f'Challenge not found: {challenge}') + + +class ACME: + + def __init__(self, *, account_key_path=None): + self.directory = None + self.account_key_path = account_key_path + self.dirheaders = headers = { + 'User-Agent': 'acme-dns-tiny/2.1', + 'Accept-Language': 'en', + } + self.headers = {**headers, 'Content-Type': 'application/jose+json'} + self.jws_header = None + self.jwk_thumbprint = None + + def generate_jws_header(self, account_key_path): + log.debug('Read account key.') + out = openssl(['rsa', '-in', account_key_path, '-noout', '-text']) + pub_hex, pub_exp = re.search( + r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)' + r'\r?\npublicExponent: ([0-9]+)', + out, re.MULTILINE | re.DOTALL).groups() + self.jws_header = header = { + 'alg': 'RS256', + 'jwk': { + 'e': b64(int(pub_exp).to_bytes(100, 'big').lstrip(b'\0')), + 'kty': 'RSA', + 'n': b64(bytes.fromhex(re.sub(r'(\s|:)', '', pub_hex))), + }, + 'nonce': None, + } + self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'], + sort_keys=True, separators=(',', ':'))) + + def init_sreqs(self, directory_url=None): + self.generate_jws_header(self.account_key_path) + log.info('Fetch information from the ACME directory.') + self.directory = dirmap = requests.get(directory_url, + headers=self.dirheaders).json() + log.info('Request first nonce') + self.jws_header['nonce'] = requests.get(dirmap["newNonce"] + ).headers['Replay-Nonce'] + + def sreq(self, url=None, payload=None, headers=None): + """Sends signed requests to ACME server.""" + # POST-as-GET (payload=None != {}), payload64 is an empty string + payload64 = '' if payload is None else b64json(payload) + protected64 = b64json({**self.jws_header, 'url': url}) + jose = { + 'protected': protected64, + 'payload': payload64, + 'signature': b64(openssl(["dgst", "-sha256", "-sign", + self.account_key_path], f'{protected64}.{payload64}')) + } + try: + req = requests.post(url, json=jose, + headers={**self.headers, **(headers or {})}) + except requests.exceptions.RequestException as error: + req = error.response + self.jws_header['nonce'] = req.headers['Replay-Nonce'] + jmap = {} + with contextlib.suppress(ValueError): + jmap = req.json() + if req.status_code not in (200, 201): + raise ValueError(f'Request failed: {req.status_code} {jmap}, ' + 'hint: {sreq.headers.get("Link", "empty")}.') + return collections.namedtuple('sreq', ['code', 'headers', 'map', + 'text'])(req.status_code, req.headers, jmap, req.text) + + def register_account(self, *, contacts=None): + log.info('Register/Login ACME Account.') + srv_terms = self.directory.get('meta', {}).get('termsOfService', '') + if srv_terms: + log.warning('Terms of service auto agreed: %s', srv_terms) + account_request = { + **dict({'termsOfServiceAgreed': True} if srv_terms else {}), + **dict({'contact': contacts} if contacts else {}), + } + sreq = self.sreq(self.directory['newAccount'], account_request) + self.jws_header['kid'] = kid = sreq.headers['Location'] + del self.jws_header['jwk'] + if sreq.code == 201: + log.info(' - Registered a new account: "%s"', kid) + elif sreq.code == 200: + log.debug(' - Account is already registered: "%s"', kid) + sreq = self.sreq(self.jws_header['kid'], {}).map + if contacts and (set(contacts) != set(sreq.map['contact'])): + self.sreq(self.jws_header["kid"], account_request) + log.info(' - Account updated with latest contact information.') + + def new_order(self, *, domains=None): + log.info('Request a new ACME order.') + sreq = self.sreq(self.directory['newOrder'], {'identifiers': + [{'type': 'dns', 'value': x} for x in domains]}) + # note: standard says only 201 is returned, assume 200 is not ok + if sreq.code != 201 or sreq.map['status'] not in ('pending', 'ready'): + raise ValueError(f'newOrder failed: {sreq.code} {sreq.map}') + order_location = sreq.headers['Location'] + log.debug(' - Order received: %s', order_location) + if sreq.map['status'] == 'ready': + log.info('No challenge to process: order is already ready.') + sreq.map['authorizations'] = [] + return sreq.map, order_location + + def validate_order(self, finalize_url, order_location, csr_der): + log.info('Request to finalize the order (all chalenge completed)') + self.sreq(finalize_url, {'csr': csr_der}) + while True: + sreq = self.sreq(order_location) + if sreq.map['status'] == 'processing': + time.sleep(sreq.headers.get('Retry-After', 5)) + elif sreq.map['status'] == 'valid': + log.info('Order finalized') + break + else: + raise ValueError(f'Error finalizing order: {sreq.map}') + return sreq.map + + def validate_challenge(self, *, url=None, keyauth=None): + log.info('Asking ACME server to validate challenge') + self.sreq(url, {'keyAuthorization': keyauth}) + while True: + sreq = self.sreq(url) + if sreq.map['status'] == 'pending': + time.sleep(sreq.headers.get('Retry-After', 5)) + elif sreq.map['status'] == 'valid': + log.info('ACME has verified challenge for domain.') + break + else: + raise ValueError(f'Challenge for domain failed: {sreq.map}') + + def get_auth(self, authurl): + log.info('Process challenge for authorization: %s', authurl) + sreq = self.sreq(authurl).map + domain = sreq['identifier']['value'] + challenge = [c for c in sreq['challenges'] if c['type'] == 'dns-01'][0] + token = re.sub(r'[^A-Za-z0-9_-]', '_', challenge['token']) # b64url + keyauth = f'{token}.{self.jwk_thumbprint}' + keydigest64 = b64sha256(keyauth) + domain = f'_acme-challenge.{domain}.' + return domain, keydigest64, challenge['url'], keyauth + + def get_certificate(self, crt_url, ): + log.info('Signed certificate at: %s', crt_url) + sreq = self.sreq(crt_url, + headers={'Accept': 'application/pem-certificate-chain'}) + log.info(' - Certificate links given by server: %s', + sreq.headers.get('link', '')) + return sreq.text + + +def get_crt(args): + ttl = args['--ttl'] or 60 + domains = extract_domains_from_csr(args['--csr']) + acme = ACME(account_key_path=args['--account-key']) + acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY) + acme.register_account(contacts=args['--contact']) + order, order_location = acme.new_order(domains=domains) + + log.info('Completing each each authorization challenge') + for authurl in order['authorizations']: + domain, keydigest64, challenge_url, keyauth = acme.get_auth(authurl) + dns_script(args['--script'], 'add', domain, keydigest64) + self_challenge_check(domain=domain, ttl=ttl, challenge=keydigest64) + try: + acme.validate_challenge(url=challenge_url, keyauth=keyauth) + finally: + dns_script(args['--script'], 'delete', domain, '') + + csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER'])) + order = acme.validate_order(order['finalize'], order_location, csr_der) + return acme.get_certificate(order['certificate']) + + +def main(args): + args = args or docopt.docopt(__doc__) + log.addHandler(logging.StreamHandler()) + log.setLevel( + (args['--verbose'] and logging.DEBUG) or + (args['--quiet'] and logging.ERROR) or + logging.INFO + ) + print(get_crt(args), end='') + + +if __name__ == "__main__": + main() diff --git a/doc/ssh_api_example/update-acme-challenge b/doc/ssh_api_example/update-acme-challenge new file mode 100755 index 0000000..39b90d0 --- /dev/null +++ b/doc/ssh_api_example/update-acme-challenge @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +# Copyright 2019 vg +# SPDX-License-Identifier: MIT + +'''\ +Takes json in stdin to modify a challenge in a zone txt record. + +Usage: update-acme-challenge --zones=ZONES + +Options: + --zones comma separated list of authorized zones to be changed + +Json format: +{ + "action": "add|delete", + "zone": "zone_name_to_modify", + "challenge": "mandatory only with add action: challenge", +} +''' + + +import sys +import json +import subprocess +import docopt + + +def nsupdate(zone, challenge): + content = f''' + server ::1 + del {zone} TXT + add {zone} TXT "{challenge}" + send + ''' + subprocess.run(['nsupdate'], check=True, input=content) + + +def main(): + args = docopt.docopt(__doc__) + + jsonmap = json.load(sys.stdin.read()) + + zones = [x.strip() for x in args['--zones'].split(',')] + zone = jsonmap.get('zone', '') + if zone not in zones: + raise ValueError(f'not permitted to modify zone {zone}') + + action = jsonmap.get('action', '') + if action not in ('add', 'delete'): + raise ValueError(f'bad value for action content: {action}') + + challenge = jsonmap.get('challenge', '') + if not all(x.isalnum() or x in ('+', '/') for x in challenge): + raise ValueError('bad format for challenge content') + + nsupdate(zone, challenge if action == 'add' else '') + + +main() diff --git a/license.txt b/license.txt new file mode 100644 index 0000000..f736cb7 --- /dev/null +++ b/license.txt @@ -0,0 +1,23 @@ +MIT License + +Copyright (c) 2019 vg +Copyright (c) 2016 Adrien Dorsaz +Copyright (c) 2015 Daniel Roesler + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py new file mode 100644 index 0000000..9a2562d --- /dev/null +++ b/tests/test_acme_dns_tiny.py @@ -0,0 +1,132 @@ +#!python3 + + +import collections +import os +import sys +import tempfile +import pytest + + +# append cwd to module importable paths (pytest-3 should be executed from main +# directory). +sys.path.append(os.path.realpath(".")) +import acme_dns_tiny + + +DOMAIN = os.getenv('TEST_DOMAIN') +ACME_STAGING_DIRECTORY = \ + 'https://acme-staging-v02.api.letsencrypt.org/directory' +CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain + + +def test_sanity_env(): + assert DOMAIN + assert CONTACT + + +@pytest.fixture(scope='module') +def casemap(tmpdir_factory): + + tmpdir = tmpdir_factory.mktemp("data") + + def _gen_key(name, size): + path = str(tmpdir.join(name)) + acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)]) + return path + + account_key = _gen_key('account_key', 4096) + domain_key = _gen_key('domain_key', 4096) + weak_account_key = _gen_key('weak_account_key', 1024) + weak_domain_key = _gen_key('weak_domain_key', 1024) + + default_args = { + '--acme-directory': ACME_STAGING_DIRECTORY, + '--script': 'tests/test_script.sh', + '--account-key': account_key, + #'--ttl': 60, # already the default + '--ttl': None, + '--contact': None, + #'--contact': [CONTACT], # not supported by staging directory + '--verbose': None, + '--quiet': None, + } + + def _csr(name, domain_key, subj=f'/CN={DOMAIN}'): + path = str(tmpdir.join(name)) + acme_dns_tiny.openssl([ + 'req', '-new', '-key', domain_key, '-subj', subj, '-out', path + ]) + return {**default_args, '--csr': path} + + def _san_csr(name, subj='/', san=None): + path = str(tmpdir.join(name)) + with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \ + open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig: + conf.write(orig.read()) + conf.write(f'\n[SAN]\nsubjectAltName={san}\n') + conf.flush() + acme_dns_tiny.openssl([ + 'req', '-new', '-key', domain_key, '-subj', subj, + '-reqexts', 'SAN', '-config', conf.name, '-out', path + ]) + return {**default_args, '--csr': path} + + return { + # single domain + 'single': _csr('csr_single', domain_key), + + # single, but weak account key used + 'weakaccountkey': { + **_csr('csr_simple', domain_key), + '--account-key': weak_account_key, + }, + + # single, but weak domain key used to sign csr + 'weakdomainkey': _csr('csr_weakdomain', weak_domain_key), + + # single, csr wrongly signed by account key + 'csrbyaccount': _csr('csr_byaccount', account_key), + + # wildcard domain in CN + 'wildcard': _csr('csr_wildcard', domain_key, f'/CN=*.{DOMAIN}'), + + # san only + 'sanonly': _san_csr('csr_sanonly', + san=f'DNS:{DOMAIN},DNS:www.{DOMAIN}'), + + # san and cn + 'san': _san_csr('csr_san', subj=f'/CN={DOMAIN}', + san=f'DNS:www.{DOMAIN}'), + + # san wildcard (domain + *.domain, contrary to wildcard in cn) + 'wildcardsan': _san_csr('csr_wildcardsan', + san=f'DNS:{DOMAIN},DNS:*.{DOMAIN}') + } + + +def assert_certificate_chain(captured): + #assert not captured.err + certlist = captured.out.split('-----BEGIN CERTIFICATE-----') + assert len(certlist) == 3 + assert certlist[0] == '' + assert '-----END CERTIFICATE-----\n' in certlist[1] + assert '-----END CERTIFICATE-----\n' in certlist[2] + textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], + stdin=captured.out) + assert 'Issuer' in textchain + + +def test_success(capsys, casemap): + for name in ('single', 'wildcard', 'sanonly', 'san', 'wildcardsan'): + print(f'test_success case: {name}', file=sys.stderr) + acme_dns_tiny.main(casemap[name]) + captured = capsys.readouterr() + assert_certificate_chain(captured) + + +def test_assert(casemap): + for name in ('weakaccountkey', 'weakdomainkey', 'csrbyaccount'): + print(f'test_assert case: {name}', file=sys.stderr) + with pytest.raises(ValueError): + acme_dns_tiny.main(casemap[name]) diff --git a/tests/test_script.sh b/tests/test_script.sh new file mode 100755 index 0000000..dbd6812 --- /dev/null +++ b/tests/test_script.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +ssh -i "${TEST_SSHKEY}" "${TEST_USER}@${TEST_DNSSERVER}" : <