summaryrefslogtreecommitdiffstats
path: root/acme_dns_tiny.py
blob: 4903c6391ce7d17b5e5b7a46c1f886d2e858762a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
#!/usr/bin/env python3
#
"""\
Tiny ACME client to get TLS certificate by responding to DNS challenges. It
uses Let's Encrypt servers by default.

As the script requires access to your private ACME account key and dns server,
please read through it. It's about 326 lines, 225 SLOC with no line
exceeding 80cols.

Usage:
  acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose]
                   [--acme-directory=URL] [--ttl=SECONDS]
                   [--separator=STR]
                   (--account-key=PATH) (--csr=PATH) (--script=PATH)
  acme_dns_tiny.py -h|--help

Options:

  --account-key=PATH    path to your Let's Encrypt account private key
  --csr=PATH            path to your certificate signing request
  --quiet               suppress output except for errors
  --verbose             show all debug information on stderr
  --acme-directory=URL  where to make acme request, default is Let's Encrypt
  --contact=MAIL        create/update contact info (ex "mailto:x@example.com")
  --ttl=SECONDS         time before (re)try self check
  --separator=STR       list all chains joined by STR if specified
  --script=PATH         script to run to update the DNS server record

Called script:

Script is called with the following arguments: action zone challenge

  action     can be "add" or "delete"
  zone       in the form _acme-challenge.domain. domains are extracted from CSR
  challenge  given by the ACME server, to be set as a TXT record

Example:

|  path/to/ \\
|        --account-key=./account.key \\
|        --csr=./domain.csr \\
|        --contact=contact@domain \\
|        --ttl=300 \\
|        --script=./update-dns-record > signed.crt

Example Crontab Renewal (once per month):

0 0 1 * * path/to/acme_dns_tiny.py --account-key /path/to/account.key \
  --csr /path/to/domain.csr > /path/to/signed.crt \
  2>> /var/log/acme_tiny.log

Example of called script:

#!/bin/bash
case "$1" in
    add) ssh authoritative.dns.host nsupdate \
            <<<$"server ::1\\nupdate add $2 60 TXT $3\\nsend";;
    delete) ssh authoritative.dns.host nsupdate \
            <<<$"server ::1\\nupdate delete $2 TXT";;
esac
"""


import subprocess
import json
import base64
import time
import hashlib
import re
import logging
import collections
import contextlib

import docopt
import requests
import dns
import dns.resolver as resolver


ACME_DEFAULT_DIRECTORY = 'https://acme-v02.api.letsencrypt.org/directory'

# consistent logging: global instance share accross all this module
log = logging.getLogger('acme_dns_tiny')
log_configure = True # useful for unit tests


def b64(b):
    """Encodes string as base64 as specified in ACME RFC"""
    return base64.urlsafe_b64encode(b).decode("utf8").rstrip("=")

def b64sha256(payload):
    return b64(hashlib.sha256(payload.encode('utf8')).digest())

def b64json(obj):
    return b64(json.dumps(obj).encode("utf8"))

def openssl(subcommand, stdin=None):
    """Run openssl command line and raise IOError on non-zero return."""
    binary = True if '-sign' in subcommand or 'DER' in subcommand else False
    return subprocess.run(
        ['openssl'] + subcommand,
        input=stdin.encode('utf8') if binary and stdin is not None else stdin,
        stdout=subprocess.PIPE,
        check=True,
        encoding=None if binary else 'utf8',
    ).stdout

def dns_script(script=None, action=None, zone=None, content=None):
    log.info('Calling script: "%s %s %s %s"', script, action, zone, content)
    subprocess.run([script, action, zone, content], check=True)

def extract_domains_from_csr(csr_path):
    log.info('Extract domains from CSR.')
    csr = openssl(['req', '-in', csr_path, '-noout', '-text'])
    san = re.findall(
            r'X509v3 Subject Alternative Name:\s+([^\r\n]+)\r?\n',
            csr, re.MULTILINE)
    domains = set(re.findall(r'Subject:.*?\s+?CN\s*?=\s*?([^\s,;/]+)', csr)
            + re.findall(r'DNS:\s*(\S+)\s*(?:,|$)', san and san[0] or ''))
    if not domains:
        raise ValueError('No domain to validate in the provided CSR.')
    return domains

def self_challenge_check(*, domain=None, ttl=None, challenge=None):
    log.info('Self testing %s with challenge "%s"', domain, challenge)
    for checknum in range(10):
        log.info('Sleep 1 TTL (%ss) before self check', ttl)
        time.sleep(ttl)
        log.debug('Self check number %s', checknum)
        try:
            try:
                responses = resolver.resolve(domain, rdtype='TXT').rrset
            except AttributeError:
                # backward compatibility for older dns dnspython releases
                responses = resolver.query(domain, rdtype='TXT').rrset
        except dns.exception.DNSException as e:
            log.debug('  - DNS error: %s: %s', type(e).__name__, e)
            continue
        responses = [x.to_text() for x in responses]
        log.debug('  - Found values: %s', responses)
        if f'"{challenge}"' in responses:
            break
    else:
        raise ValueError(f'Challenge not found: {challenge}')


class ACME:

    def __init__(self, *, account_key_path=None):
        self.directory = None
        self.account_key_path = account_key_path
        self.dirheaders = headers = {
            'User-Agent': 'acme-dns-tiny/2.1',
            'Accept-Language': 'en',
        }
        self.headers = {**headers, 'Content-Type': 'application/jose+json'}
        self.jws_header = None
        self.jwk_thumbprint = None

    def generate_jws_header(self, account_key_path):
        log.debug('Read account key.')
        out = openssl(['pkey', '-in', account_key_path, '-noout', '-text'])
        pub_hex, pub_exp = re.search(
            r'modulus:\r?\n\s+00:([a-f0-9\:\s]+?)'
            r'\r?\npublicExponent: ([0-9]+)',
            out, re.MULTILINE | re.DOTALL).groups()
        self.jws_header = header = {
            'alg': 'RS256',
            'jwk': {
                'e': b64(int(pub_exp).to_bytes(100, 'big').lstrip(b'\0')),
                'kty': 'RSA',
                'n': b64(bytes.fromhex(re.sub(r'(\s|:)', '', pub_hex))),
            },
            'nonce': None,
        }
        self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'],
            sort_keys=True, separators=(',', ':')))

    def request_new_nonce(self):
        log.info('Request new nonce')
        self.jws_header['nonce'] = requests.get(self.directory["newNonce"]
                ).headers['Replay-Nonce']

    def init_sreqs(self, directory_url=None):
        self.generate_jws_header(self.account_key_path)
        log.info('Fetch information from the ACME directory.')
        self.directory = dirmap = requests.get(directory_url,
                headers=self.dirheaders).json()
        self.request_new_nonce()

    def sreq(self, url=None, payload=None, headers=None, max_retry=10):
        """Sends signed requests to ACME server."""
        if max_retry < 0:
            raise ValueError('max_retry exceeded')
        # POST-as-GET (payload=None != {}), payload64 is an empty string
        payload64   = '' if payload is None else b64json(payload)
        protected64 = b64json({**self.jws_header, 'url': url})
        jose = {
            'protected':    protected64,
            'payload':      payload64,
            'signature':    b64(openssl(["dgst", "-sha256", "-sign",
                self.account_key_path], f'{protected64}.{payload64}'))
        }
        try:
            req = requests.post(url, json=jose,
                    headers={**self.headers, **(headers or {})})
        except requests.exceptions.RequestException as error:
            req = error.response
        self.jws_header['nonce'] = req.headers['Replay-Nonce']
        jmap = {}
        with contextlib.suppress(ValueError):
            jmap = req.json()
        if req.status_code not in (200, 201):
            if jmap['type'] == 'urn:ietf:params:acme:error:badNonce':
                log.info('nonce expired or invalid, retry (left %d)', max_retry)
                self.request_new_nonce()
                return self.sreq(url, payload, headers, max_retry-1)
            raise ValueError(f'Request failed: {req.status_code} {jmap}, '
                'hint: {sreq.headers.get("Link", "empty")}.')
        return collections.namedtuple('sreq', ['code', 'headers', 'map',
            'text'])(req.status_code, req.headers, jmap, req.text)

    def register_account(self, *, contacts=None):
        log.info('Register/Login ACME Account.')
        srv_terms = self.directory.get('meta', {}).get('termsOfService', '')
        if srv_terms:
            log.warning('Terms of service auto agreed: %s', srv_terms)
        account_request = {
            **dict({'termsOfServiceAgreed': True} if srv_terms else {}),
            **dict({'contact': contacts} if contacts else {}),
        }
        sreq = self.sreq(self.directory['newAccount'], account_request)
        self.jws_header['kid'] = kid = sreq.headers['Location']
        del self.jws_header['jwk']
        if sreq.code == 201:
            log.info('  - Registered a new account: "%s"', kid)
        elif sreq.code == 200:
            log.debug('  - Account is already registered: "%s"', kid)
            sreq = self.sreq(self.jws_header['kid'], {})
        if contacts and (set(contacts) != set(sreq.map['contact'])):
            self.sreq(self.jws_header["kid"], account_request)
            log.info('  - Account updated with latest contact information.')

    def new_order(self, *, domains=None):
        log.info('Request a new ACME order.')
        sreq = self.sreq(self.directory['newOrder'], {'identifiers':
            [{'type': 'dns', 'value': x} for x in domains]})
        # note: standard says only 201 is returned, assume 200 is not ok
        if sreq.code != 201 or sreq.map['status'] not in ('pending', 'ready'):
            raise ValueError(f'newOrder failed: {sreq.code} {sreq.map}')
        order_location = sreq.headers['Location']
        log.debug('  - Order received: %s', order_location)
        if sreq.map['status'] == 'ready':
            log.info('No challenge to process: order is already ready.')
            sreq.map['authorizations'] = []
        return sreq.map, order_location

    def validate_order(self, finalize_url, order_location, csr_der):
        log.info('Request to finalize the order (all chalenge completed)')
        self.sreq(finalize_url, {'csr': csr_der})
        while True:
            sreq = self.sreq(order_location)
            if sreq.map['status'] == 'processing':
                time.sleep(sreq.headers.get('Retry-After', 5))
            elif sreq.map['status'] == 'valid':
                log.info('Order finalized')
                break
            else:
                raise ValueError(f'Error finalizing order: {sreq.map}')
        return sreq.map

    def validate_challenge(self, *, url=None, keyauth=None):
        log.info('Asking ACME server to validate challenge')
        self.sreq(url, {'keyAuthorization': keyauth})
        while True:
            sreq = self.sreq(url)
            if sreq.map['status'] == 'pending':
                time.sleep(sreq.headers.get('Retry-After', 5))
            elif sreq.map['status'] == 'valid':
                log.info('ACME has verified challenge for domain.')
                break
            else:
                raise ValueError(f'Challenge for domain failed: {sreq.map}')

    def get_auth(self, authurl):
        log.info('Process challenge for authorization: %s', authurl)
        sreq = self.sreq(authurl).map
        domain = sreq['identifier']['value']
        challenge = [c for c in sreq['challenges'] if c['type'] == 'dns-01'][0]
        token = re.sub(r'[^A-Za-z0-9_-]', '_', challenge['token']) # b64url
        keyauth = f'{token}.{self.jwk_thumbprint}'
        keydigest64 = b64sha256(keyauth)
        domain = f'_acme-challenge.{domain}.'
        return domain, keydigest64, challenge['url'], keyauth

    def get_chains(self, crt_url, separator=None):
        headers = {'Accept': 'application/pem-certificate-chain'}
        log.info('Signed certificate at: %s', crt_url)
        sreq = self.sreq(crt_url, headers=headers)

        if separator is None:
            return sreq.text

        chains = [sreq.text]
        regex = re.compile(r'\s*<(\S+)>\s*;\s*rel\s*=\s*"alternate"\s*')
        for match in regex.finditer(sreq.headers.get('Link', '')):
            chains.append(self.sreq(match.group(1), headers=headers).text)

        return separator.join(chains)


def get_crt(args):
    ttl = int(args['--ttl'] or "60")
    domains = extract_domains_from_csr(args['--csr'])
    acme = ACME(account_key_path=args['--account-key'])
    acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY)
    acme.register_account(contacts=args['--contact'])
    order, order_location = acme.new_order(domains=domains)

    log.info('Completing each each authorization challenge')
    for authurl in order['authorizations']:
        domain, keydigest64, challenge_url, keyauth = acme.get_auth(authurl)
        dns_script(args['--script'], 'add', domain, keydigest64)
        self_challenge_check(domain=domain, ttl=ttl, challenge=keydigest64)
        try:
            acme.validate_challenge(url=challenge_url, keyauth=keyauth)
        finally:
            dns_script(args['--script'], 'delete', domain, '')

    csr_der = b64(openssl(['req', '-in', args['--csr'], '-outform', 'DER']))
    order = acme.validate_order(order['finalize'], order_location, csr_der)
    separator = None
    if args['--separator'] is not None:
        separator = args['--separator'].encode('utf8').decode('unicode_escape')
    chains = acme.get_chains(order['certificate'], separator)

    return chains


def main(args):
    args = args or docopt.docopt(__doc__)
    if log_configure:
        log.addHandler(logging.StreamHandler())
        log.setLevel(
                (args['--verbose'] and logging.DEBUG) or
                (args['--quiet']   and logging.ERROR) or
                logging.INFO
        )
    print(get_crt(args), end='')


if __name__ == "__main__":
    main(None)