diff options
-rw-r--r-- | acme_dns_tiny.py | 33 | ||||
-rw-r--r-- | requirements.txt | 3 | ||||
-rw-r--r-- | tests/test_acme_dns_tiny.py | 121 |
3 files changed, 99 insertions, 58 deletions
diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py index 051e6ba..9922e45 100644 --- a/acme_dns_tiny.py +++ b/acme_dns_tiny.py @@ -9,7 +9,7 @@ please read through it. It's about 326 lines, 225 SLOC with no line exceeding 80cols. Usage: - acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose] + acme_dns_tiny.py [--quiet] [--verbose] [--acme-directory=URL] [--ttl=SECONDS] [--separator=STR] (--account-key=PATH) (--csr=PATH) (--script=PATH) @@ -22,7 +22,6 @@ Options: --quiet suppress output except for errors --verbose show all debug information on stderr --acme-directory=URL where to make acme request, default is Let's Encrypt - --contact=MAIL create/update contact info (ex "mailto:x@example.com") --ttl=SECONDS time before (re)try self check --separator=STR list all chains joined by STR if specified --script=PATH script to run to update the DNS server record @@ -40,7 +39,6 @@ Example: | path/to/ \\ | --account-key=./account.key \\ | --csr=./domain.csr \\ -| --contact=contact@domain \\ | --ttl=300 \\ | --script=./update-dns-record > signed.crt @@ -177,17 +175,22 @@ class ACME: self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'], sort_keys=True, separators=(',', ':'))) + def request_new_nonce(self): + log.info('Request new nonce') + self.jws_header['nonce'] = requests.get(self.directory["newNonce"] + ).headers['Replay-Nonce'] + def init_sreqs(self, directory_url=None): self.generate_jws_header(self.account_key_path) log.info('Fetch information from the ACME directory.') self.directory = dirmap = requests.get(directory_url, headers=self.dirheaders).json() - log.info('Request first nonce') - self.jws_header['nonce'] = requests.get(dirmap["newNonce"] - ).headers['Replay-Nonce'] + self.request_new_nonce() - def sreq(self, url=None, payload=None, headers=None): + def sreq(self, url=None, payload=None, headers=None, max_retry=10): """Sends signed requests to ACME server.""" + if max_retry < 0: + raise ValueError('max_retry exceeded') # POST-as-GET (payload=None != {}), payload64 is an empty string payload64 = '' if payload is None else b64json(payload) protected64 = b64json({**self.jws_header, 'url': url}) @@ -207,19 +210,22 @@ class ACME: with contextlib.suppress(ValueError): jmap = req.json() if req.status_code not in (200, 201): + if jmap['type'] == 'urn:ietf:params:acme:error:badNonce': + log.info('nonce expired or invalid, retry (left %d)', max_retry) + self.request_new_nonce() + return self.sreq(url, payload, headers, max_retry-1) raise ValueError(f'Request failed: {req.status_code} {jmap}, ' 'hint: {sreq.headers.get("Link", "empty")}.') return collections.namedtuple('sreq', ['code', 'headers', 'map', 'text'])(req.status_code, req.headers, jmap, req.text) - def register_account(self, *, contacts=None): + def register_account(self): log.info('Register/Login ACME Account.') srv_terms = self.directory.get('meta', {}).get('termsOfService', '') if srv_terms: log.warning('Terms of service auto agreed: %s', srv_terms) account_request = { **dict({'termsOfServiceAgreed': True} if srv_terms else {}), - **dict({'contact': contacts} if contacts else {}), } sreq = self.sreq(self.directory['newAccount'], account_request) self.jws_header['kid'] = kid = sreq.headers['Location'] @@ -229,9 +235,6 @@ class ACME: elif sreq.code == 200: log.debug(' - Account is already registered: "%s"', kid) sreq = self.sreq(self.jws_header['kid'], {}) - if contacts and (set(contacts) != set(sreq.map['contact'])): - self.sreq(self.jws_header["kid"], account_request) - log.info(' - Account updated with latest contact information.') def new_order(self, *, domains=None): log.info('Request a new ACME order.') @@ -253,7 +256,7 @@ class ACME: while True: sreq = self.sreq(order_location) if sreq.map['status'] == 'processing': - time.sleep(sreq.headers.get('Retry-After', 5)) + time.sleep(int(sreq.headers.get('Retry-After', 5))) elif sreq.map['status'] == 'valid': log.info('Order finalized') break @@ -267,7 +270,7 @@ class ACME: while True: sreq = self.sreq(url) if sreq.map['status'] == 'pending': - time.sleep(sreq.headers.get('Retry-After', 5)) + time.sleep(int(sreq.headers.get('Retry-After', 5))) elif sreq.map['status'] == 'valid': log.info('ACME has verified challenge for domain.') break @@ -306,7 +309,7 @@ def get_crt(args): domains = extract_domains_from_csr(args['--csr']) acme = ACME(account_key_path=args['--account-key']) acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY) - acme.register_account(contacts=args['--contact']) + acme.register_account() order, order_location = acme.new_order(domains=domains) log.info('Completing each each authorization challenge') diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..6342311 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +docopt +requests +dnspython diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py index 0d61b97..5a4a479 100644 --- a/tests/test_acme_dns_tiny.py +++ b/tests/test_acme_dns_tiny.py @@ -1,9 +1,7 @@ #!python3 -import collections import contextlib -import inspect import logging import os import subprocess @@ -26,7 +24,6 @@ acme_dns_tiny.log_configure = False ACME_STAGING_DIRECTORY = \ 'https://acme-staging-v02.api.letsencrypt.org/directory' DOMAIN = os.getenv('TEST_DOMAIN') -CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain SCRIPT = os.getenv('TEST_SCRIPT') @@ -39,6 +36,28 @@ key_types = ( ('ecdsa', 256, False), ) +# the first case of success expectation, try also to test the anti-replay +# nonce expiration retry mechanism by waiting for it to expire (more than +# 5min). +first_success_case_nonce_timeout_done = False +original_sreq_method = acme_dns_tiny.ACME.sreq + + +nonce_expiration_sreq_wrapper_counter_max = 3 +nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max +def nonce_expiration_sreq_wrapper(*args, **kwargs): + global nonce_expiration_sreq_wrapper_counter + nonce_expiration_sreq_wrapper_counter -= 1 + if nonce_expiration_sreq_wrapper_counter <= 0: + nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max + wait = 30 + logging.info('waiting %dmin for nonce expiration test', wait) + for i in range(wait, 0, -1): + logging.info('%d min left', i) + time.sleep(60) + acme_dns_tiny.ACME.sreq = original_sreq_method # reset original method + return original_sreq_method(*args, **kwargs) + @contextlib.contextmanager def does_not_raise(): @@ -118,16 +137,16 @@ def keys_generator(): @pytest.fixture(scope='module', params=keys_generator()) -def account_domain_key(request): +def keysdef(request): return request.param @pytest.fixture(scope='module', params=[ {'subj': f'/CN={DOMAIN}', 'san': None}, {'subj': f'/CN=*.{DOMAIN}', 'san': None}, - {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, + {'subj': '/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'}, {'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'}, - {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, + {'subj': '/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'}, ]) def subj_fixture(request): return request.param @@ -136,42 +155,32 @@ def subj_fixture(request): CSR_ARGS_COUNTER = 0 @pytest.fixture(scope='module', params=[ - pytest.param(None, id='no_separator/no_contact'), + pytest.param({}, id='no_separator'), pytest.param({'args': {'--separator': '\\0'}}, id='separator'), - pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'), - pytest.param({'args': { - '--contact': [CONTACT], - '--separator': '\\0', - }}, id='separator+contact'), ]) -def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): +def main_args(tmpdir_factory, keysdef, request, subj_fixture): global CSR_ARGS_COUNTER CSR_ARGS_COUNTER += 1 tmpdir = tmpdir_factory.mktemp("data") - #keysdef = account_domain_key.values[0] - keysdef = account_domain_key - #print('debug', keysdef) - default_args = { + account_key = keysdef['account_key_path'](tmpdir) + args = { '--acme-directory': ACME_STAGING_DIRECTORY, '--script': SCRIPT, - '--account-key': keysdef['account_key_path'](tmpdir), + '--account-key': account_key, #'--ttl': 60, # already the default '--ttl': None, '--separator': None, - '--contact': None, '--verbose': None, '--quiet': None, + **request.param.get('args', {}) } - if request.param: - default_args.update(request.param.get('args', {})) - - if default_args['--separator'] is not None: - assert default_args['--separator'] == '\\0' - assert default_args['--separator'].encode( + if args['--separator'] is not None: + assert args['--separator'] == '\\0' + assert args['--separator'].encode( 'utf8').decode('unicode_escape') == '\0' name = f'{CSR_ARGS_COUNTER:02X}' @@ -191,13 +200,16 @@ def main_args(tmpdir_factory, account_domain_key, request, subj_fixture): '-out', path ]) - return {**default_args, '--csr': path}, keysdef['raise_expected'] + return {**args, '--csr': path}, { + **keysdef, + 'account_key_path': account_key, # expanded version + 'domain_key_path': domain_key, # expanded version + } def test_sanity_env(): 'check environment is correctly set before running other tests' assert bool(DOMAIN) - assert bool(CONTACT) assert bool(SCRIPT) @@ -206,19 +218,33 @@ def test_sanity_command(): subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy']) -def assert_cert(capsys, args): +def assert_cert(capsys, args, keysdef): captured = capsys.readouterr() #assert not captured.err #certlist = captured.out.split() - if args['--separator'] is None: + logging.debug('captured stdout %s', captured.out) + logging.debug('captured stderr %s', captured.err) + + # Subscriber certificates with RSA public keys are issued from our RSA + # intermediates, which are issued only from our RSA root ISRG Root X1 + # (i.e. they are not cross-signed). Therefore, all RSA subscriber certificates + # have only a single chain available. (since 2025-06-11, see here for more + # information https://letsencrypt.org/certificates/). + if ( + args['--separator'] is None + or ( + args['--separator'] is not None + and '/domain_rsa_' in keysdef['domain_key_path'] + ) + ): assert '\0' not in captured.out else: assert '\0' in captured.out for chain in captured.out.split('\0'): - assert chain.count('-----BEGIN CERTIFICATE-----') == 2 - assert chain.count('-----END CERTIFICATE-----') == 2 + assert chain.count('-----BEGIN CERTIFICATE-----') >= 2 + assert chain.count('-----END CERTIFICATE-----') >= 2 textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'], stdin=chain) @@ -228,33 +254,42 @@ def assert_cert(capsys, args): 'certtool', '-e', '--verify-profile', 'low'], input=chain, encoding='utf8') assert ' Verified.' in certtool_out - assert certtool_out.count('Subject:') == 3 + assert certtool_out.count('Subject:') >= 3 -def module_main_caller(*, capsys, args, expectation): +def module_main_caller(*, capsys, args, expectation, do_expire_nonce, keysdef): logging.info(f'module_main_caller({args}, {expectation})') logging.debug('before call to acme_dns_tiny.main()') with expectation: + acme_dns_tiny.ACME.sreq = original_sreq_method + if do_expire_nonce: + logging.info('doing expire nonce test') + acme_dns_tiny.ACME.sreq = nonce_expiration_sreq_wrapper acme_dns_tiny.main(args) # check_cert is under the expectation context manager since if # acme_dns_tiny.main() raises, following statement must not be run. - assert_cert(capsys, args) + assert_cert(capsys, args, keysdef) logging.debug('after call to acme_dns_tiny.main()') def test_main(main_args, capsys): - t_start = time.time() + #t_start = time.time() args = main_args[0] - raise_expected = main_args[1] + keysdef = main_args[1] + raise_expected = keysdef['raise_expected'] + do_expire_nonce = False #print('subj', subj, 'args', args) expectation = does_not_raise() if raise_expected: expectation = pytest.raises(ValueError) - module_main_caller(capsys=capsys, args=args, expectation=expectation) - t_stop = time.time() - - # calculate for letsencrypt rate limit (50account per hour) - t_diff = 216 - (t_stop - t_start) - if t_diff > 0: - time.sleep(t_diff) + elif not first_success_case_nonce_timeout_done: + do_expire_nonce = True + module_main_caller(capsys=capsys, args=args, expectation=expectation, + do_expire_nonce=do_expire_nonce, + keysdef=keysdef) + #t_stop = time.time() + ## calculate for letsencrypt rate limit (50account per hour) + #t_diff = 216 - (t_stop - t_start) + #if t_diff > 0 and os.environ.get('ACCOUNT_SLEEP_SKIP') != '1': + # time.sleep(t_diff) |