Source code for dxaws_acm.planner

"""Planner for ACM manager lifecycle (v0.1.0)."""

from __future__ import annotations

from .models import AcmCurrent, AcmDesired, AcmPlan


def _normalize_domain(value: str) -> str:
    v = str(value).strip().lower()
    if v.endswith("."):
        v = v[:-1]
    return v


def _domain_matches(desired: AcmDesired, current: AcmCurrent) -> bool:
    # dxaws-acm issues one certificate per FQDN (no SAN bundles).
    desired_domain = _normalize_domain(desired.domain_name)
    current_domain = _normalize_domain(current.domain_name or "")
    return bool(current_domain) and desired_domain == current_domain


[docs] def plan_acm(*, desired: AcmDesired, current: AcmCurrent) -> AcmPlan: if current.exists and current.status == "ISSUED" and _domain_matches(desired, current): return AcmPlan( desired=desired, current=current, action="noop", certificate_arn=current.certificate_arn, wait=False, reason="existing_certificate_issued", validation_records=list(current.validation_records), ) if current.exists and current.status == "PENDING_VALIDATION": return AcmPlan( desired=desired, current=current, action="ensure_dns", certificate_arn=current.certificate_arn, wait=True, reason="pending_validation", validation_records=list(current.validation_records), ) if current.exists and current.status == "FAILED": return AcmPlan( desired=desired, current=current, action="recreate", certificate_arn=current.certificate_arn, wait=True, reason="certificate_failed", validation_records=list(current.validation_records), ) if current.exists: return AcmPlan( desired=desired, current=current, action="wait", certificate_arn=current.certificate_arn, wait=True, reason=f"status_{current.status or 'unknown'}", validation_records=list(current.validation_records), ) return AcmPlan( desired=desired, current=current, action="request", wait=True, reason="no_matching_certificate", )