summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--acme_dns_tiny.py33
-rw-r--r--requirements.txt3
-rw-r--r--tests/test_acme_dns_tiny.py121
3 files changed, 99 insertions, 58 deletions
diff --git a/acme_dns_tiny.py b/acme_dns_tiny.py
index 051e6ba..9922e45 100644
--- a/acme_dns_tiny.py
+++ b/acme_dns_tiny.py
@@ -9,7 +9,7 @@ please read through it. It's about 326 lines, 225 SLOC with no line
exceeding 80cols.
Usage:
- acme_dns_tiny.py [--contact=MAIL]... [--quiet] [--verbose]
+ acme_dns_tiny.py [--quiet] [--verbose]
[--acme-directory=URL] [--ttl=SECONDS]
[--separator=STR]
(--account-key=PATH) (--csr=PATH) (--script=PATH)
@@ -22,7 +22,6 @@ Options:
--quiet suppress output except for errors
--verbose show all debug information on stderr
--acme-directory=URL where to make acme request, default is Let's Encrypt
- --contact=MAIL create/update contact info (ex "mailto:x@example.com")
--ttl=SECONDS time before (re)try self check
--separator=STR list all chains joined by STR if specified
--script=PATH script to run to update the DNS server record
@@ -40,7 +39,6 @@ Example:
| path/to/ \\
| --account-key=./account.key \\
| --csr=./domain.csr \\
-| --contact=contact@domain \\
| --ttl=300 \\
| --script=./update-dns-record > signed.crt
@@ -177,17 +175,22 @@ class ACME:
self.jwk_thumbprint = b64sha256(json.dumps(header['jwk'],
sort_keys=True, separators=(',', ':')))
+ def request_new_nonce(self):
+ log.info('Request new nonce')
+ self.jws_header['nonce'] = requests.get(self.directory["newNonce"]
+ ).headers['Replay-Nonce']
+
def init_sreqs(self, directory_url=None):
self.generate_jws_header(self.account_key_path)
log.info('Fetch information from the ACME directory.')
self.directory = dirmap = requests.get(directory_url,
headers=self.dirheaders).json()
- log.info('Request first nonce')
- self.jws_header['nonce'] = requests.get(dirmap["newNonce"]
- ).headers['Replay-Nonce']
+ self.request_new_nonce()
- def sreq(self, url=None, payload=None, headers=None):
+ def sreq(self, url=None, payload=None, headers=None, max_retry=10):
"""Sends signed requests to ACME server."""
+ if max_retry < 0:
+ raise ValueError('max_retry exceeded')
# POST-as-GET (payload=None != {}), payload64 is an empty string
payload64 = '' if payload is None else b64json(payload)
protected64 = b64json({**self.jws_header, 'url': url})
@@ -207,19 +210,22 @@ class ACME:
with contextlib.suppress(ValueError):
jmap = req.json()
if req.status_code not in (200, 201):
+ if jmap['type'] == 'urn:ietf:params:acme:error:badNonce':
+ log.info('nonce expired or invalid, retry (left %d)', max_retry)
+ self.request_new_nonce()
+ return self.sreq(url, payload, headers, max_retry-1)
raise ValueError(f'Request failed: {req.status_code} {jmap}, '
'hint: {sreq.headers.get("Link", "empty")}.')
return collections.namedtuple('sreq', ['code', 'headers', 'map',
'text'])(req.status_code, req.headers, jmap, req.text)
- def register_account(self, *, contacts=None):
+ def register_account(self):
log.info('Register/Login ACME Account.')
srv_terms = self.directory.get('meta', {}).get('termsOfService', '')
if srv_terms:
log.warning('Terms of service auto agreed: %s', srv_terms)
account_request = {
**dict({'termsOfServiceAgreed': True} if srv_terms else {}),
- **dict({'contact': contacts} if contacts else {}),
}
sreq = self.sreq(self.directory['newAccount'], account_request)
self.jws_header['kid'] = kid = sreq.headers['Location']
@@ -229,9 +235,6 @@ class ACME:
elif sreq.code == 200:
log.debug(' - Account is already registered: "%s"', kid)
sreq = self.sreq(self.jws_header['kid'], {})
- if contacts and (set(contacts) != set(sreq.map['contact'])):
- self.sreq(self.jws_header["kid"], account_request)
- log.info(' - Account updated with latest contact information.')
def new_order(self, *, domains=None):
log.info('Request a new ACME order.')
@@ -253,7 +256,7 @@ class ACME:
while True:
sreq = self.sreq(order_location)
if sreq.map['status'] == 'processing':
- time.sleep(sreq.headers.get('Retry-After', 5))
+ time.sleep(int(sreq.headers.get('Retry-After', 5)))
elif sreq.map['status'] == 'valid':
log.info('Order finalized')
break
@@ -267,7 +270,7 @@ class ACME:
while True:
sreq = self.sreq(url)
if sreq.map['status'] == 'pending':
- time.sleep(sreq.headers.get('Retry-After', 5))
+ time.sleep(int(sreq.headers.get('Retry-After', 5)))
elif sreq.map['status'] == 'valid':
log.info('ACME has verified challenge for domain.')
break
@@ -306,7 +309,7 @@ def get_crt(args):
domains = extract_domains_from_csr(args['--csr'])
acme = ACME(account_key_path=args['--account-key'])
acme.init_sreqs(args['--acme-directory'] or ACME_DEFAULT_DIRECTORY)
- acme.register_account(contacts=args['--contact'])
+ acme.register_account()
order, order_location = acme.new_order(domains=domains)
log.info('Completing each each authorization challenge')
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..6342311
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,3 @@
+docopt
+requests
+dnspython
diff --git a/tests/test_acme_dns_tiny.py b/tests/test_acme_dns_tiny.py
index 0d61b97..5a4a479 100644
--- a/tests/test_acme_dns_tiny.py
+++ b/tests/test_acme_dns_tiny.py
@@ -1,9 +1,7 @@
#!python3
-import collections
import contextlib
-import inspect
import logging
import os
import subprocess
@@ -26,7 +24,6 @@ acme_dns_tiny.log_configure = False
ACME_STAGING_DIRECTORY = \
'https://acme-staging-v02.api.letsencrypt.org/directory'
DOMAIN = os.getenv('TEST_DOMAIN')
-CONTACT = os.getenv('TEST_CONTACT') # form: mailto:name@domain
SCRIPT = os.getenv('TEST_SCRIPT')
@@ -39,6 +36,28 @@ key_types = (
('ecdsa', 256, False),
)
+# the first case of success expectation, try also to test the anti-replay
+# nonce expiration retry mechanism by waiting for it to expire (more than
+# 5min).
+first_success_case_nonce_timeout_done = False
+original_sreq_method = acme_dns_tiny.ACME.sreq
+
+
+nonce_expiration_sreq_wrapper_counter_max = 3
+nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max
+def nonce_expiration_sreq_wrapper(*args, **kwargs):
+ global nonce_expiration_sreq_wrapper_counter
+ nonce_expiration_sreq_wrapper_counter -= 1
+ if nonce_expiration_sreq_wrapper_counter <= 0:
+ nonce_expiration_sreq_wrapper_counter = nonce_expiration_sreq_wrapper_counter_max
+ wait = 30
+ logging.info('waiting %dmin for nonce expiration test', wait)
+ for i in range(wait, 0, -1):
+ logging.info('%d min left', i)
+ time.sleep(60)
+ acme_dns_tiny.ACME.sreq = original_sreq_method # reset original method
+ return original_sreq_method(*args, **kwargs)
+
@contextlib.contextmanager
def does_not_raise():
@@ -118,16 +137,16 @@ def keys_generator():
@pytest.fixture(scope='module', params=keys_generator())
-def account_domain_key(request):
+def keysdef(request):
return request.param
@pytest.fixture(scope='module', params=[
{'subj': f'/CN={DOMAIN}', 'san': None},
{'subj': f'/CN=*.{DOMAIN}', 'san': None},
- {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'},
+ {'subj': '/', 'san': f'DNS:{DOMAIN},DNS:www.{DOMAIN}'},
{'subj': f'/CN={DOMAIN}', 'san': f'DNS:www.{DOMAIN}'},
- {'subj': f'/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'},
+ {'subj': '/', 'san': f'DNS:{DOMAIN},DNS:*.{DOMAIN}'},
])
def subj_fixture(request):
return request.param
@@ -136,42 +155,32 @@ def subj_fixture(request):
CSR_ARGS_COUNTER = 0
@pytest.fixture(scope='module', params=[
- pytest.param(None, id='no_separator/no_contact'),
+ pytest.param({}, id='no_separator'),
pytest.param({'args': {'--separator': '\\0'}}, id='separator'),
- pytest.param({'args': {'--contact': [CONTACT]}}, id='contact'),
- pytest.param({'args': {
- '--contact': [CONTACT],
- '--separator': '\\0',
- }}, id='separator+contact'),
])
-def main_args(tmpdir_factory, account_domain_key, request, subj_fixture):
+def main_args(tmpdir_factory, keysdef, request, subj_fixture):
global CSR_ARGS_COUNTER
CSR_ARGS_COUNTER += 1
tmpdir = tmpdir_factory.mktemp("data")
- #keysdef = account_domain_key.values[0]
- keysdef = account_domain_key
- #print('debug', keysdef)
- default_args = {
+ account_key = keysdef['account_key_path'](tmpdir)
+ args = {
'--acme-directory': ACME_STAGING_DIRECTORY,
'--script': SCRIPT,
- '--account-key': keysdef['account_key_path'](tmpdir),
+ '--account-key': account_key,
#'--ttl': 60, # already the default
'--ttl': None,
'--separator': None,
- '--contact': None,
'--verbose': None,
'--quiet': None,
+ **request.param.get('args', {})
}
- if request.param:
- default_args.update(request.param.get('args', {}))
-
- if default_args['--separator'] is not None:
- assert default_args['--separator'] == '\\0'
- assert default_args['--separator'].encode(
+ if args['--separator'] is not None:
+ assert args['--separator'] == '\\0'
+ assert args['--separator'].encode(
'utf8').decode('unicode_escape') == '\0'
name = f'{CSR_ARGS_COUNTER:02X}'
@@ -191,13 +200,16 @@ def main_args(tmpdir_factory, account_domain_key, request, subj_fixture):
'-out', path
])
- return {**default_args, '--csr': path}, keysdef['raise_expected']
+ return {**args, '--csr': path}, {
+ **keysdef,
+ 'account_key_path': account_key, # expanded version
+ 'domain_key_path': domain_key, # expanded version
+ }
def test_sanity_env():
'check environment is correctly set before running other tests'
assert bool(DOMAIN)
- assert bool(CONTACT)
assert bool(SCRIPT)
@@ -206,19 +218,33 @@ def test_sanity_command():
subprocess.run([SCRIPT, 'add', f'_acme-challenge.{DOMAIN}.', 'dummy'])
-def assert_cert(capsys, args):
+def assert_cert(capsys, args, keysdef):
captured = capsys.readouterr()
#assert not captured.err
#certlist = captured.out.split()
- if args['--separator'] is None:
+ logging.debug('captured stdout %s', captured.out)
+ logging.debug('captured stderr %s', captured.err)
+
+ # Subscriber certificates with RSA public keys are issued from our RSA
+ # intermediates, which are issued only from our RSA root ISRG Root X1
+ # (i.e. they are not cross-signed). Therefore, all RSA subscriber certificates
+ # have only a single chain available. (since 2025-06-11, see here for more
+ # information https://letsencrypt.org/certificates/).
+ if (
+ args['--separator'] is None
+ or (
+ args['--separator'] is not None
+ and '/domain_rsa_' in keysdef['domain_key_path']
+ )
+ ):
assert '\0' not in captured.out
else:
assert '\0' in captured.out
for chain in captured.out.split('\0'):
- assert chain.count('-----BEGIN CERTIFICATE-----') == 2
- assert chain.count('-----END CERTIFICATE-----') == 2
+ assert chain.count('-----BEGIN CERTIFICATE-----') >= 2
+ assert chain.count('-----END CERTIFICATE-----') >= 2
textchain = acme_dns_tiny.openssl(['x509', '-text', '-noout'],
stdin=chain)
@@ -228,33 +254,42 @@ def assert_cert(capsys, args):
'certtool', '-e', '--verify-profile', 'low'], input=chain,
encoding='utf8')
assert ' Verified.' in certtool_out
- assert certtool_out.count('Subject:') == 3
+ assert certtool_out.count('Subject:') >= 3
-def module_main_caller(*, capsys, args, expectation):
+def module_main_caller(*, capsys, args, expectation, do_expire_nonce, keysdef):
logging.info(f'module_main_caller({args}, {expectation})')
logging.debug('before call to acme_dns_tiny.main()')
with expectation:
+ acme_dns_tiny.ACME.sreq = original_sreq_method
+ if do_expire_nonce:
+ logging.info('doing expire nonce test')
+ acme_dns_tiny.ACME.sreq = nonce_expiration_sreq_wrapper
acme_dns_tiny.main(args)
# check_cert is under the expectation context manager since if
# acme_dns_tiny.main() raises, following statement must not be run.
- assert_cert(capsys, args)
+ assert_cert(capsys, args, keysdef)
logging.debug('after call to acme_dns_tiny.main()')
def test_main(main_args, capsys):
- t_start = time.time()
+ #t_start = time.time()
args = main_args[0]
- raise_expected = main_args[1]
+ keysdef = main_args[1]
+ raise_expected = keysdef['raise_expected']
+ do_expire_nonce = False
#print('subj', subj, 'args', args)
expectation = does_not_raise()
if raise_expected:
expectation = pytest.raises(ValueError)
- module_main_caller(capsys=capsys, args=args, expectation=expectation)
- t_stop = time.time()
-
- # calculate for letsencrypt rate limit (50account per hour)
- t_diff = 216 - (t_stop - t_start)
- if t_diff > 0:
- time.sleep(t_diff)
+ elif not first_success_case_nonce_timeout_done:
+ do_expire_nonce = True
+ module_main_caller(capsys=capsys, args=args, expectation=expectation,
+ do_expire_nonce=do_expire_nonce,
+ keysdef=keysdef)
+ #t_stop = time.time()
+ ## calculate for letsencrypt rate limit (50account per hour)
+ #t_diff = 216 - (t_stop - t_start)
+ #if t_diff > 0 and os.environ.get('ACCOUNT_SLEEP_SKIP') != '1':
+ # time.sleep(t_diff)