Source code for dxaws_acm.manager

"""ACM manager implementing the Base Manager Pattern."""

from __future__ import annotations

import os
import time
from dataclasses import dataclass, field
from typing import Any, Protocol, Sequence

from dxaws_core.o11y import O11y
from dxaws_dns.manager import DnsManager
from dxaws_dns.models import DnsRecordDesired

from dxaws_acm.planner import plan_acm
from dxaws_acm.models import AcmCurrent, AcmDesired, AcmManagerResult, AcmOutputs, AcmPlan
from dxaws_acm.providers.base import ProviderBase


[docs] @dataclass(frozen=True, slots=True, kw_only=True) class ApplyOptions: max_wait_seconds: int = 900 poll_interval_seconds: int = 15 dns_ttl: int = 60 emit_events: bool = True
[docs] @dataclass(frozen=True, slots=True, kw_only=True) class ExecuteOptions: apply_options: ApplyOptions | None = None emit_events: bool = True
[docs] @dataclass(frozen=True, slots=True, kw_only=True) class AcmManager: # Provider is optional for the common case. # If omitted, the manager will default to the AWS provider. provider: ProviderBase | None = None dns: DnsManager | None = None o11y: O11y = field(default_factory=O11y.noop) def _provider(self, desired: AcmDesired | None = None) -> ProviderBase: if self.provider is not None: return self.provider # Default to AWS provider. We resolve lazily to keep providers out of the # common import surface and to survive refactors. try: from dxaws_core import AwsSession # type: ignore except Exception: # pragma: no cover try: from dxaws_core.aws import AwsSession # type: ignore except Exception: from dxaws_core.session import AwsSession # type: ignore region = None if desired is not None: region = getattr(desired, "region", None) region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1" aws = AwsSession(region_name=region) # Provider class name has varied during refactors. from .providers import aws as aws_mod for cls_name in ("AwsProvider", "AwsAcmProvider", "ProviderAws"): cls = getattr(aws_mod, cls_name, None) if cls is None: continue try: return cls(aws=aws) # type: ignore[call-arg] except TypeError: # Some historical providers may accept region. try: return cls(aws=aws, region=region) # type: ignore[call-arg] except TypeError: return cls() # type: ignore[call-arg] raise ImportError( "dxaws-acm AWS provider not found. Expected one of: AwsProvider, AwsAcmProvider, ProviderAws " "in dxaws_acm.providers.aws" ) def _dns(self, desired: AcmDesired | None = None) -> DnsManager: if self.dns is not None: return self.dns return DnsManager() def _emit(self, event: str, **fields: Any) -> None: self.o11y.info(event, **fields) def _plan_summary(self, plan: AcmPlan) -> dict[str, Any]: return { "action": plan.action, "certificate_arn": plan.certificate_arn, "wait": plan.wait, "reason": plan.reason, }
[docs] def get_current(self, desired: AcmDesired) -> AcmCurrent: # Find matching certificate (best effort selection) domain = desired.domain_name desired_domains = {domain.lower().rstrip(".")} try: summaries = self._provider(desired).list_certificates(include_status=True) except TypeError: summaries = self._provider(desired).list_certificates() best: tuple[int, int, str] | None = None best_info = None def _status_rank(status: str) -> int: ranks = {"ISSUED": 0, "PENDING_VALIDATION": 1} return ranks.get(status or "", 2) for summary in summaries: arn = getattr(summary, "certificate_arn", None) status = getattr(summary, "status", "") or "" if not arn: continue try: info = self._provider(desired).describe_certificate(certificate_arn=str(arn)) except Exception as exc: # Certificates can disappear between list and describe (e.g., acceptance drift step). # Treat as non-fatal and continue scanning. if self.o11y is not None: try: self._emit( "manager.get_current.skip", module="dxaws-acm", certificate_arn=str(arn), reason="describe_failed", error=str(exc), ) except Exception: pass continue domains = set((getattr(info, "domains", None) or [])) if not domains: primary = getattr(info, "domain_name", None) sans = getattr(info, "subject_alternative_names", None) or [] domains = {d for d in [primary, *sans] if d} domains = {str(d).rstrip(".").lower() for d in domains if d} if not desired_domains.issubset(domains): continue extra = len(domains - desired_domains) candidate = (_status_rank(status), extra, str(arn)) if best is None or candidate < best: best = candidate best_info = info if best is None or best_info is None: return AcmCurrent( exists=False, certificate_arn=None, status=None, domain_name=None, validation_records=[], ) status = getattr(best_info, "status", None) domain_name = getattr(best_info, "domain_name", None) records = getattr(best_info, "validation_records", None) or [] return AcmCurrent( exists=True, certificate_arn=best[2], status=str(status) if status is not None else None, domain_name=str(domain_name) if domain_name else None, validation_records=list(records), )
[docs] def plan(self, desired: AcmDesired, current: AcmCurrent | None = None) -> AcmPlan: current = current or self.get_current(desired) start = time.monotonic() self._emit( "manager.plan.start", module="dxaws-acm", domain_name=desired.domain_name, region=desired.region, ) plan = plan_acm(desired=desired, current=current) summary = self._plan_summary(plan) self._emit( "manager.plan.done", module="dxaws-acm", domain_name=desired.domain_name, region=desired.region, action=plan.action, duration_ms=int((time.monotonic() - start) * 1000), plan_summary=summary, ) return plan
def _ensure_dns( self, *, validation_records: list[Any], ttl: int, ) -> list[dict[str, str]]: dns = self._dns(None) changes: list[dict[str, str]] = [] for vr in validation_records: name = getattr(vr, "name", None) rr_type = getattr(vr, "type", None) value = getattr(vr, "value", None) if not name or not rr_type or not value: continue fqdn = str(name) desired = DnsRecordDesired( record_name=fqdn, record_type=str(rr_type).upper(), ttl=ttl, values=[str(value)], ) dns.execute(desired) changes.append( { "name": fqdn, "type": desired.record_type, "value": str(value), } ) return changes
[docs] def apply(self, plan: AcmPlan, *, options: ApplyOptions | None = None) -> AcmOutputs: options = options or ApplyOptions() if plan.is_noop and plan.current.certificate_arn and plan.current.status: return AcmOutputs( certificate_arn=plan.current.certificate_arn, status=plan.current.status, ) if plan.action == "recreate": raise RuntimeError("certificate status is FAILED; manual intervention required") if options.emit_events: self._emit( "manager.apply.start", module="dxaws-acm", domain_name=plan.desired.domain_name, region=plan.desired.region, action=plan.action, ) provider = self._provider(plan.desired) cert_arn = plan.certificate_arn info = None if plan.action == "request": cert_arn = provider.request_dns_validated_certificate( primary_domain=plan.desired.domain_name, sans=[], tags=plan.desired.tags, ) if cert_arn: info = provider.describe_certificate(certificate_arn=cert_arn) validation_records = list(plan.validation_records) # ACM often takes a short time to publish DNS validation ResourceRecords. if not validation_records and cert_arn: start = time.monotonic() poll = 0 while True: poll += 1 try: validation_records = list( provider.get_dns_validation_records(certificate_arn=cert_arn) ) except Exception: validation_records = [] # Fallback to whatever the last describe returned (best-effort) if not validation_records and info is not None: validation_records = list(getattr(info, "validation_records", []) or []) if validation_records: break if time.monotonic() - start >= options.max_wait_seconds: raise TimeoutError( "Timed out waiting for ACM DNS validation records to appear" ) # Refresh describe occasionally while we wait if poll == 1 or poll % 3 == 0: try: info = provider.describe_certificate(certificate_arn=cert_arn) except Exception: pass if options.emit_events and (poll == 1 or poll % 4 == 0): self._emit( "manager.apply.progress", module="dxaws-acm", domain_name=plan.desired.domain_name, region=plan.desired.region, action="dns_wait", status=getattr(info, "status", None) if info else None, poll_count=poll, ) time.sleep(max(options.poll_interval_seconds, 0)) dns_changes: list[dict[str, str]] = [] if validation_records: dns_changes = self._ensure_dns( validation_records=validation_records, ttl=options.dns_ttl, ) status = getattr(info, "status", None) if info else None if plan.wait and cert_arn: start = time.monotonic() poll = 0 while True: info = provider.describe_certificate(certificate_arn=cert_arn) status = getattr(info, "status", None) poll += 1 if status == "ISSUED": break if time.monotonic() - start >= options.max_wait_seconds: raise TimeoutError("ACM certificate did not reach ISSUED before timeout") if options.emit_events and (poll == 1 or poll % 4 == 0): self._emit( "manager.apply.progress", module="dxaws-acm", domain_name=plan.desired.domain_name, region=plan.desired.region, action="wait", status=status, poll_count=poll, ) time.sleep(max(options.poll_interval_seconds, 0)) if options.emit_events: self._emit( "manager.apply.done", module="dxaws-acm", domain_name=plan.desired.domain_name, region=plan.desired.region, action=plan.action, outcome="applied", dns_changes=dns_changes, ) if cert_arn is None or status is None: raise RuntimeError("ACM apply did not resolve a certificate ARN/status") return AcmOutputs(certificate_arn=cert_arn, status=str(status))
[docs] def execute(self, desired: AcmDesired, *, options: ExecuteOptions | None = None) -> AcmManagerResult: options = options or ExecuteOptions() current = self.get_current(desired) plan = self.plan(desired, current) summary = self._plan_summary(plan) try: outputs = self.apply(plan, options=options.apply_options) outcome = "noop" if plan.is_noop else "applied" if options.emit_events: self._emit( "manager.execute.done", module="dxaws-acm", domain_name=desired.domain_name, region=desired.region, action=plan.action, outcome=outcome, plan_summary=summary, ) return AcmManagerResult( desired=desired, current=current, plan=plan, outputs=outputs, outcome=outcome, ) except Exception as exc: if options.emit_events: self._emit( "manager.execute.done", module="dxaws-acm", domain_name=desired.domain_name, region=desired.region, action=plan.action, outcome="failed", plan_summary=summary, error=str(exc), ) raise