Source code for dxaws_acm.planner
"""Planner for ACM manager lifecycle (v0.1.0)."""
from __future__ import annotations
from .models import AcmCurrent, AcmDesired, AcmPlan
def _normalize_domain(value: str) -> str:
v = str(value).strip().lower()
if v.endswith("."):
v = v[:-1]
return v
def _domain_matches(desired: AcmDesired, current: AcmCurrent) -> bool:
# dxaws-acm issues one certificate per FQDN (no SAN bundles).
desired_domain = _normalize_domain(desired.domain_name)
current_domain = _normalize_domain(current.domain_name or "")
return bool(current_domain) and desired_domain == current_domain
[docs]
def plan_acm(*, desired: AcmDesired, current: AcmCurrent) -> AcmPlan:
if current.exists and current.status == "ISSUED" and _domain_matches(desired, current):
return AcmPlan(
desired=desired,
current=current,
action="noop",
certificate_arn=current.certificate_arn,
wait=False,
reason="existing_certificate_issued",
validation_records=list(current.validation_records),
)
if current.exists and current.status == "PENDING_VALIDATION":
return AcmPlan(
desired=desired,
current=current,
action="ensure_dns",
certificate_arn=current.certificate_arn,
wait=True,
reason="pending_validation",
validation_records=list(current.validation_records),
)
if current.exists and current.status == "FAILED":
return AcmPlan(
desired=desired,
current=current,
action="recreate",
certificate_arn=current.certificate_arn,
wait=True,
reason="certificate_failed",
validation_records=list(current.validation_records),
)
if current.exists:
return AcmPlan(
desired=desired,
current=current,
action="wait",
certificate_arn=current.certificate_arn,
wait=True,
reason=f"status_{current.status or 'unknown'}",
validation_records=list(current.validation_records),
)
return AcmPlan(
desired=desired,
current=current,
action="request",
wait=True,
reason="no_matching_certificate",
)