summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvg <vgm+dev@devys.org>2019-05-21 15:35:48 +0200
committervg <vgm+dev@devys.org>2019-05-21 15:35:48 +0200
commit89f066b81671df29772be31804af3c531f58cec1 (patch)
tree93c403e3c4ab0141345362abe293485bd18f9439
downloadacme-dns-tiny-89f066b81671df29772be31804af3c531f58cec1.tar.gz
acme-dns-tiny-89f066b81671df29772be31804af3c531f58cec1.tar.bz2
acme-dns-tiny-89f066b81671df29772be31804af3c531f58cec1.zip
Initial commit
-rw-r--r--.gitignore8
-rw-r--r--acme_dns_tiny.py326
-rwxr-xr-xdoc/ssh_api_example/update-acme-challenge59
-rw-r--r--license.txt23
-rw-r--r--tests/test_acme_dns_tiny.py132
-rwxr-xr-xtests/test_script.sh9
6 files changed, 557 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..1d74053
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+*.log
+__pycache__
+*.pyc
+build
+*.build
+cache
+*.cache
+*.retry
diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py
new file mode 100644
index 0000000..1e5c505
--- /dev/null
+++ b/acme_dns_tiny.py
@@ -0,0 +1,326 @@
+#!/usr/bin/env python3
+#
+"""\
+Tiny ACME client to get TLS certificate by responding to DNS challenges. It
+uses Let's Encrypt servers by default.
+
+As the script requires access to your private ACME account key and dns server,
+please read through it. It's about 326 lines, 225 SLOC with no line
+exceeding 80cols.
+
+Usage:
+ acme_tiny_dns.py [--contact=MAIL]... [--quiet] [--verbose]
+ [--acme-directory=URL] [--ttl=SECONDS]
+ (--account-key=PATH) (--csr=PATH) (--script=PATH)
+ acme_tiny_dns.py -h|--help
+
+Options:
+
+ --account-key=PATH path to your Let's Encrypt account private key
+ --csr=PATH path to your certificate signing request
+ --quiet suppress output except for errors
+ --verbose show all debug information on stderr
+ --acme-directory=URL where to make acme request, default is Let's Encrypt
+ --contact=MAIL create/update mail contact information
+ --ttl=SECONDS time before (re)try self check
+ --script=PATH script to run to update the DNS server record
+
+Called script usage:
+
+Script is called with the following arguments: action zone challenge
+
+ action can be "add" or "delete"
+ zone in the form _acme-challenge.domain. domains are extracted from CSR
+ challenge given by the ACME server, to be set as a TXT record
+
+Example:
+
+acme_tiny_dns.py \\
+ --account-key=./account.key \\
+ --csr=./domain.csr \\
+ --contact=contact@domain \\
+ --ttl=300 \\
+ --script=./update-dns-record > signed.crt
+
+Example Crontab Renewal (once per month):
+
+0 0 1 * * acme_tiny_dns.py --account-key /path/to/account.key \
+ --csr /path/to/domain.csr > /path/to/signed.crt \
+ 2>> /var/log/acme_tiny.log
+
+Example of called script:
+
+#!/bin/bash
+case "$1" in
+ add) ssh authoritative.dns.host nsupdate \
+ <<<$"server ::1\\nupdate add $2 60 TXT $3\\nsend";;
+ delete) ssh authoritative.dns.host nsupdate \
+ <<<$"server ::1\\nupdate delete $2 TXT";;
+esac
+"""
+
+
+import subprocess
+import json
+import base64
+import time
+import hashlib
+import re
+import logging
+import collections
+import contextlib
+
+import docopt
+import requests
+import dns
+import dns.resolver as resolver
+
+
+ACME_DEFAULT_DIRECTORY = \
+ 'https://acme-staging-v02.api.letsencrypt.org/directory'
+
+# consistent logging: global instance share accross all this module
+log = logging.getLogger('acme_dns_tiny')
+
+
+def b64(b):
+ """Encodes string as base64 as specified in ACME RFC"""
+ return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")
+
+def b64sha256(payload):
+ return b64(hashlib.sha256(payload.encode('utf8')).digest())
+
+def b64json(obj):
+ return b64(json.dumps(obj).encode("utf8"))
+
+def openssl(subcommand, stdin=None):
+ """Run openssl command line and raise IOError on non-zero return."""
+ binary = True if '-sign' in subcommand or 'DER' in subcommand else False
+ return subprocess.run(
+ ['openssl'] + subcommand,
+ input=stdin.encode('utf8') if binary and stdin is not None else stdin,
+ stdout=subprocess.PIPE,
+ check=True,
+ encoding=None if binary else 'utf8',
+ ).stdout
+
+def dns_script(script=None, action=None, zone=None, content=None):
+ log.info('Calling script: "%s %s %s %s"', script, action, zone, content)
+ subprocess.run([script, action, zone, content], check=True)
+
+def extract_domains_from_csr(csr_path):
+ log.info('Extract domains from CSR.')
+ csr = openssl(['req', '-in', csr_path, '-noout', '-text'])
+ san = re.findall(
+ r'X509v3 Subject Alternative Name:\s+([^\r\n]+)\r?\n',
+ csr, re.MULTILINE)
+ domains = set(re.findall(r'Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)', csr)
+ + re.findall(r'DNS:\s*(\S+)\s*(?:,|$)', san and san[0] or ''))
+ if not domains:
+ raise ValueError('No domain to validate in the provided CSR.')
+ return domains
+
+def self_challenge_check(*, domain=None, ttl=None, challenge=None):
+ log.info('Self testing %s with challenge "%s"', domain, challenge)
+ for checknum in range(10):
+ log.info('Sleep 1 TTL (%ss) before self check', ttl)
+ time.sleep(ttl)
+ log.debug('Self check number %s', checknum)
+ try:
+ responses = resolver.query(domain, rdtype='TXT').rrset
+ except dns.exception.DNSException as e:
+ log.debug(' - DNS error: %s: %s', type(e).__name__, e)
+ continue
+ responses = [x.to_text() for x in responses]
+ log.debug(' - Found values: %s', responses)
+ if f'"{challenge}"' in responses:
+ break
+ else:
+ raise ValueError(f'Challenge not found: {challenge}')
+
+
+class ACME:
+
+ def __init__(self, *, account_key_path=None):
+ self.directory = None
+ self.account_key_path = account_key_path
+ self.dirheaders = headers = {
+ 'User-Agent': 'acme-dns-tiny/2.1',
+ 'Accept-Language': 'en',
+ }
+ self.headers = {**headers, 'Content-Type': 'application/jose+json'}
+ self.jws_header = None
+ self.jwk_thumbprint = None
+
+ def generate_jws_header(self, account_key_path):
+ log.debug('Read account key.')
+ out = openssl(['rsa', '-in', account_key_path, '-noout', '-text'])
+ pub_hex, pub_exp = re.search(
+ r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)'
+ r'\r?\npublicExponent: ([0-9]+)',
+ out, re.MULTILINE | re.DOTALL).groups()
+ self.jws_header = header = {
+ 'alg': 'RS256',
+ 'jwk': {
+ 'e': b64(int(pub_exp).to_bytes(100, 'big').lstrip(b'\0')),
+ 'kty': 'RSA',
+ 'n': b64(bytes.fromhex(re.sub(r'(\s|:)', '', pub_hex))),
+ },
+ 'nonce': None,
+ }
+ self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'],
+ sort_keys=True, separators=(',', ':')))
+
+ def init_sreqs(self, directory_url=None):
+ self.generate_jws_header(self.account_key_path)
+ log.info('Fetch information from the ACME directory.')
+ self.directory = dirmap = requests.get(directory_url,
+ headers=self.dirheaders).json()
+ log.info('Request first nonce')
+ self.jws_header['nonce'] = requests.get(dirmap["newNonce"]
+ ).headers['Replay-Nonce']
+
+ def sreq(self, url=None, payload=None, headers=None):
+ """Sends signed requests to ACME server."""
+ # POST-as-GET (payload=None != {}), payload64 is an empty string
+ payload64 = '' if payload is None else b64json(payload)
+ protected64 = b64json({**self.jws_header, 'url': url})
+ jose = {
+ 'protected': protected64,
+ 'payload': payload64,
+ 'signature': b64(openssl(["dgst", "-sha256", "-sign",
+ self.account_key_path], f'{protected64}.{payload64}'))
+ }
+ try:
+ req = requests.post(url, json=jose,
+ headers={**self.headers, **(headers or {})})
+ except requests.exceptions.RequestException as error:
+ req = error.response
+ self.jws_header['nonce'] = req.headers['Replay-Nonce']
+ jmap = {}
+ with contextlib.suppress(ValueError):
+ jmap = req.json()
+ if req.status_code not in (200, 201):
+ raise ValueError(f'Request failed: {req.status_code} {jmap}, '
+ 'hint: {sreq.headers.get("Link", "empty")}.')
+ return collections.namedtuple('sreq', ['code', 'headers', 'map',
+ 'text'])(req.status_code, req.headers, jmap, req.text)
+
+ def register_account(self, *, contacts=None):
+ log.info('Register/Login ACME Account.')
+ srv_terms = self.directory.get('meta', {}).get('termsOfService', '')
+ if srv_terms:
+ log.warning('Terms of service auto agreed: %s', srv_terms)
+ account_request = {
+ **dict({'termsOfServiceAgreed': True} if srv_terms else {}),
+ **dict({'contact': contacts} if contacts else {}),
+ }
+ sreq = self.sreq(self.directory['newAccount'], account_request)
+ self.jws_header['kid'] = kid = sreq.headers['Location']
+ del self.jws_header['jwk']
+ if sreq.code == 201:
+ log.info(' - Registered a new account: "%s"', kid)
+ elif sreq.code == 200:
+ log.debug(' - Account is already registered: "%s"', kid)
+ sreq = self.sreq(self.jws_header['kid'], {}).map
+ if contacts and (set(contacts) != set(sreq.map['contact'])):
+ self.sreq(self.jws_header["kid"], account_request)
+ log.info(' - Account updated with latest contact information.')
+
+ def new_order(self, *, domains=None):
+ log.info('Request a new ACME order.')
+ sreq = self.sreq(self.directory['newOrder'], {'identifiers':
+ [{'type': 'dns', 'value': x} for x in domains]})
+ # note: standard says only 201 is returned, assume 200 is not ok
+ if sreq.code != 201 or sreq.map['status'] not in ('pending', 'ready'):
+ raise ValueError(f'newOrder failed: {sreq.code} {sreq.map}')
+ order_location = sreq.headers['Location']
+ log.debug(' - Order received: %s', order_location)
+ if sreq.map['status'] == 'ready':
+ log.info('No challenge to process: order is already ready.')
+ sreq.map['authorizations'] = []
+ return sreq.map, order_location
+
+ def validate_order(self, finalize_url, order_location, csr_der):
+ log.info('Request to finalize the order (all chalenge completed)')
+ self.sreq(finalize_url, {'csr': csr_der})
+ while True:
+ sreq = self.sreq(order_location)
+ if sreq.map['status'] == 'processing':
+ time.sleep(sreq.headers.get('Retry-After', 5))
+ elif sreq.map['status'] == 'valid':
+ log.info('Order finalized')
+ break
+ else:
+ raise ValueError(f'Error finalizing order: {sreq.map}')
+ return sreq.map
+
+ def validate_challenge(self, *, url=None, keyauth=None):
+ log.info('Asking ACME server to validate challenge')
+ self.sreq(url, {'keyAuthorization': keyauth})
+ while True:
+ sreq = self.sreq(url)
+ if sreq.map['status'] == 'pending':
+ time.sleep(sreq.headers.get('Retry-After', 5))
+ elif sreq.map['status'] == 'valid':
+ log.info('ACME has verified challenge for domain.')
+ break
+ else:
+ raise ValueError(f'Challenge for domain failed: {sreq.map}')
+
+ def get_auth(self, authurl):
+ log.info('Process challenge for authorization: %s', authurl)
+ sreq = self.sreq(authurl).map
+ domain = sreq['identifier']['value']
+ challenge = [c for c in sreq['challenges'] if c['type'] == 'dns-01'][0]
+ token = re.sub(r'[^A-Za-z0-9_-]', '_', challenge['token']) # b64url
+ keyauth = f'{token}.{self.jwk_thumbprint}'
+ keydigest64 = b64sha256(keyauth)
+ domain = f'_acme-challenge.{domain}.'
+ return domain, keydigest64, challenge['url'], keyauth
+
+ def get_certificate(self, crt_url, ):
+ log.info('Signed certificate at: %s', crt_url)
+ sreq = self.sreq(crt_url,
+ headers={'Accept': 'application/pem-certificate-chain'})
+ log.info(' - Certificate links given by server: %s',
+ sreq.headers.get('link', ''))
+ return sreq.text
+
+
+def get_crt(args):
+ ttl = args['--ttl'] or 60
+ domains = extract_domains_from_csr(args['--csr'])
+ acme = ACME(account_key_path=args['--account-key'])
+ acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY)
+ acme.register_account(contacts=args['--contact'])
+ order, order_location = acme.new_order(domains=domains)
+
+ log.info('Completing each each authorization challenge')
+ for authurl in order['authorizations']:
+ domain, keydigest64, challenge_url, keyauth = acme.get_auth(authurl)
+ dns_script(args['--script'], 'add', domain, keydigest64)
+ self_challenge_check(domain=domain, ttl=ttl, challenge=keydigest64)
+ try:
+ acme.validate_challenge(url=challenge_url, keyauth=keyauth)
+ finally:
+ dns_script(args['--script'], 'delete', domain, '')
+
+ csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER']))
+ order = acme.validate_order(order['finalize'], order_location, csr_der)
+ return acme.get_certificate(order['certificate'])
+
+
+def main(args):
+ args = args or docopt.docopt(__doc__)
+ log.addHandler(logging.StreamHandler())
+ log.setLevel(
+ (args['--verbose'] and logging.DEBUG) or
+ (args['--quiet'] and logging.ERROR) or
+ logging.INFO
+ )
+ print(get_crt(args), end='')
+
+
+if __name__ == "__main__":
+ main()
diff --git a/doc/ssh_api_example/update-acme-challenge b/doc/ssh_api_example/update-acme-challenge
new file mode 100755
index 0000000..39b90d0
--- /dev/null
+++ b/doc/ssh_api_example/update-acme-challenge
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3
+# Copyright 2019 vg
+# SPDX-License-Identifier: MIT
+
+'''\
+Takes json in stdin to modify a challenge in a zone txt record.
+
+Usage: update-acme-challenge --zones=ZONES
+
+Options:
+ --zones comma separated list of authorized zones to be changed
+
+Json format:
+{
+ "action": "add|delete",
+ "zone": "zone_name_to_modify",
+ "challenge": "mandatory only with add action: challenge",
+}
+'''
+
+
+import sys
+import json
+import subprocess
+import docopt
+
+
+def nsupdate(zone, challenge):
+ content = f'''
+ server ::1
+ del {zone} TXT
+ add {zone} TXT "{challenge}"
+ send
+ '''
+ subprocess.run(['nsupdate'], check=True, input=content)
+
+
+def main():
+ args = docopt.docopt(__doc__)
+
+ jsonmap = json.load(sys.stdin.read())
+
+ zones = [x.strip() for x in args['--zones'].split(',')]
+ zone = jsonmap.get('zone', '')
+ if zone not in zones:
+ raise ValueError(f'not permitted to modify zone {zone}')
+
+ action = jsonmap.get('action', '')
+ if action not in ('add', 'delete'):
+ raise ValueError(f'bad value for action content: {action}')
+
+ challenge = jsonmap.get('challenge', '')
+ if not all(x.isalnum() or x in ('+', '/') for x in challenge):
+ raise ValueError('bad format for challenge content')
+
+ nsupdate(zone, challenge if action == 'add' else '')
+
+
+main()
diff --git a/license.txt b/license.txt
new file mode 100644
index 0000000..f736cb7
--- /dev/null
+++ b/license.txt
@@ -0,0 +1,23 @@
+MIT License
+
+Copyright (c) 2019 vg
+Copyright (c) 2016 Adrien Dorsaz
+Copyright (c) 2015 Daniel Roesler
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py
new file mode 100644
index 0000000..9a2562d
--- /dev/null
+++ b/tests/test_acme_dns_tiny.py
@@ -0,0 +1,132 @@
+#!python3
+
+
+import collections
+import os
+import sys
+import tempfile
+import pytest
+
+
+# append cwd to module importable paths (pytest-3 should be executed from main
+# directory).
+sys.path.append(os.path.realpath("."))
+import acme_dns_tiny
+
+
+DOMAIN = os.getenv('TEST_DOMAIN')
+ACME_STAGING_DIRECTORY = \
+ 'https://acme-staging-v02.api.letsencrypt.org/directory'
+CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain
+
+
+def test_sanity_env():
+ assert DOMAIN
+ assert CONTACT
+
+
+@pytest.fixture(scope='module')
+def casemap(tmpdir_factory):
+
+ tmpdir = tmpdir_factory.mktemp("data")
+
+ def _gen_key(name, size):
+ path = str(tmpdir.join(name))
+ acme_dns_tiny.openssl(['genrsa', '-out', path, str(size)])
+ return path
+
+ account_key = _gen_key('account_key', 4096)
+ domain_key = _gen_key('domain_key', 4096)
+ weak_account_key = _gen_key('weak_account_key', 1024)
+ weak_domain_key = _gen_key('weak_domain_key', 1024)
+
+ default_args = {
+ '--acme-directory': ACME_STAGING_DIRECTORY,
+ '--script': 'tests/test_script.sh',
+ '--account-key': account_key,
+ #'--ttl': 60, # already the default
+ '--ttl': None,
+ '--contact': None,
+ #'--contact': [CONTACT], # not supported by staging directory
+ '--verbose': None,
+ '--quiet': None,
+ }
+
+ def _csr(name, domain_key, subj=f'/CN={DOMAIN}'):
+ path = str(tmpdir.join(name))
+ acme_dns_tiny.openssl([
+ 'req', '-new', '-key', domain_key, '-subj', subj, '-out', path
+ ])
+ return {**default_args, '--csr': path}
+
+ def _san_csr(name, subj='/', san=None):
+ path = str(tmpdir.join(name))
+ with tempfile.NamedTemporaryFile('w', encoding='utf8') as conf, \
+ open('/etc/ssl/openssl.cnf', 'r', encoding='utf8') as orig:
+ conf.write(orig.read())
+ conf.write(f'\n[SAN]\nsubjectAltName={san}\n')
+ conf.flush()
+ acme_dns_tiny.openssl([
+ 'req', '-new', '-key', domain_key, '-subj', subj,
+ '-reqexts', 'SAN', '-config', conf.name, '-out', path
+ ])
+ return {**default_args, '--csr': path}
+
+ return {
+ # single domain
+ 'single': _csr('csr_single', domain_key),
+
+ # single, but weak account key used
+ 'weakaccountkey': {
+ **_csr('csr_simple', domain_key),
+ '--account-key': weak_account_key,
+ },
+
+ # single, but weak domain key used to sign csr
+ 'weakdomainkey': _csr('csr_weakdomain', weak_domain_key),
+
+ # single, csr wrongly signed by account key
+ 'csrbyaccount': _csr('csr_byaccount', account_key),
+
+ # wildcard domain in CN
+ 'wildcard': _csr('csr_wildcard', domain_key, f'/CN=*.{DOMAIN}'),
+
+ # san only
+ 'sanonly': _san_csr('csr_sanonly',
+ san=f'DNS:{DOMAIN},DNS:www.{DOMAIN}'),
+
+ # san and cn
+ 'san': _san_csr('csr_san', subj=f'/CN={DOMAIN}',
+ san=f'DNS:www.{DOMAIN}'),
+
+ # san wildcard (domain + *.domain, contrary to wildcard in cn)
+ 'wildcardsan': _san_csr('csr_wildcardsan',
+ san=f'DNS:{DOMAIN},DNS:*.{DOMAIN}')
+ }
+
+
+def assert_certificate_chain(captured):
+ #assert not captured.err
+ certlist = captured.out.split('-----BEGIN CERTIFICATE-----')
+ assert len(certlist) == 3
+ assert certlist[0] == ''
+ assert '-----END CERTIFICATE-----\n' in certlist[1]
+ assert '-----END CERTIFICATE-----\n' in certlist[2]
+ textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'],
+ stdin=captured.out)
+ assert 'Issuer' in textchain
+
+
+def test_success(capsys, casemap):
+ for name in ('single', 'wildcard', 'sanonly', 'san', 'wildcardsan'):
+ print(f'test_success case: {name}', file=sys.stderr)
+ acme_dns_tiny.main(casemap[name])
+ captured = capsys.readouterr()
+ assert_certificate_chain(captured)
+
+
+def test_assert(casemap):
+ for name in ('weakaccountkey', 'weakdomainkey', 'csrbyaccount'):
+ print(f'test_assert case: {name}', file=sys.stderr)
+ with pytest.raises(ValueError):
+ acme_dns_tiny.main(casemap[name])
diff --git a/tests/test_script.sh b/tests/test_script.sh
new file mode 100755
index 0000000..dbd6812
--- /dev/null
+++ b/tests/test_script.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+ssh -i "${TEST_SSHKEY}" "${TEST_USER}@${TEST_DNSSERVER}" : <<EOF
+{
+ "action": "$1",
+ "zone": "$2",
+ "challenge": "$3"
+}
+EOF