"""ACM manager implementing the Base Manager Pattern."""
from __future__ import annotations
import os
import time
from dataclasses import dataclass, field
from typing import Any, Protocol, Sequence
from dxaws_core.o11y import O11y
from dxaws_dns.manager import DnsManager
from dxaws_dns.models import DnsRecordDesired
from dxaws_acm.planner import plan_acm
from dxaws_acm.models import AcmCurrent, AcmDesired, AcmManagerResult, AcmOutputs, AcmPlan
from dxaws_acm.providers.base import ProviderBase
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class ApplyOptions:
max_wait_seconds: int = 900
poll_interval_seconds: int = 15
dns_ttl: int = 60
emit_events: bool = True
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class ExecuteOptions:
apply_options: ApplyOptions | None = None
emit_events: bool = True
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class AcmManager:
# Provider is optional for the common case.
# If omitted, the manager will default to the AWS provider.
provider: ProviderBase | None = None
dns: DnsManager | None = None
o11y: O11y = field(default_factory=O11y.noop)
def _provider(self, desired: AcmDesired | None = None) -> ProviderBase:
if self.provider is not None:
return self.provider
# Default to AWS provider. We resolve lazily to keep providers out of the
# common import surface and to survive refactors.
try:
from dxaws_core import AwsSession # type: ignore
except Exception: # pragma: no cover
try:
from dxaws_core.aws import AwsSession # type: ignore
except Exception:
from dxaws_core.session import AwsSession # type: ignore
region = None
if desired is not None:
region = getattr(desired, "region", None)
region = region or os.getenv("AWS_REGION") or os.getenv("AWS_DEFAULT_REGION") or "us-east-1"
aws = AwsSession(region_name=region)
# Provider class name has varied during refactors.
from .providers import aws as aws_mod
for cls_name in ("AwsProvider", "AwsAcmProvider", "ProviderAws"):
cls = getattr(aws_mod, cls_name, None)
if cls is None:
continue
try:
return cls(aws=aws) # type: ignore[call-arg]
except TypeError:
# Some historical providers may accept region.
try:
return cls(aws=aws, region=region) # type: ignore[call-arg]
except TypeError:
return cls() # type: ignore[call-arg]
raise ImportError(
"dxaws-acm AWS provider not found. Expected one of: AwsProvider, AwsAcmProvider, ProviderAws "
"in dxaws_acm.providers.aws"
)
def _dns(self, desired: AcmDesired | None = None) -> DnsManager:
if self.dns is not None:
return self.dns
return DnsManager()
def _emit(self, event: str, **fields: Any) -> None:
self.o11y.info(event, **fields)
def _plan_summary(self, plan: AcmPlan) -> dict[str, Any]:
return {
"action": plan.action,
"certificate_arn": plan.certificate_arn,
"wait": plan.wait,
"reason": plan.reason,
}
[docs]
def get_current(self, desired: AcmDesired) -> AcmCurrent:
# Find matching certificate (best effort selection)
domain = desired.domain_name
desired_domains = {domain.lower().rstrip(".")}
try:
summaries = self._provider(desired).list_certificates(include_status=True)
except TypeError:
summaries = self._provider(desired).list_certificates()
best: tuple[int, int, str] | None = None
best_info = None
def _status_rank(status: str) -> int:
ranks = {"ISSUED": 0, "PENDING_VALIDATION": 1}
return ranks.get(status or "", 2)
for summary in summaries:
arn = getattr(summary, "certificate_arn", None)
status = getattr(summary, "status", "") or ""
if not arn:
continue
try:
info = self._provider(desired).describe_certificate(certificate_arn=str(arn))
except Exception as exc:
# Certificates can disappear between list and describe (e.g., acceptance drift step).
# Treat as non-fatal and continue scanning.
if self.o11y is not None:
try:
self._emit(
"manager.get_current.skip",
module="dxaws-acm",
certificate_arn=str(arn),
reason="describe_failed",
error=str(exc),
)
except Exception:
pass
continue
domains = set((getattr(info, "domains", None) or []))
if not domains:
primary = getattr(info, "domain_name", None)
sans = getattr(info, "subject_alternative_names", None) or []
domains = {d for d in [primary, *sans] if d}
domains = {str(d).rstrip(".").lower() for d in domains if d}
if not desired_domains.issubset(domains):
continue
extra = len(domains - desired_domains)
candidate = (_status_rank(status), extra, str(arn))
if best is None or candidate < best:
best = candidate
best_info = info
if best is None or best_info is None:
return AcmCurrent(
exists=False,
certificate_arn=None,
status=None,
domain_name=None,
validation_records=[],
)
status = getattr(best_info, "status", None)
domain_name = getattr(best_info, "domain_name", None)
records = getattr(best_info, "validation_records", None) or []
return AcmCurrent(
exists=True,
certificate_arn=best[2],
status=str(status) if status is not None else None,
domain_name=str(domain_name) if domain_name else None,
validation_records=list(records),
)
[docs]
def plan(self, desired: AcmDesired, current: AcmCurrent | None = None) -> AcmPlan:
current = current or self.get_current(desired)
start = time.monotonic()
self._emit(
"manager.plan.start",
module="dxaws-acm",
domain_name=desired.domain_name,
region=desired.region,
)
plan = plan_acm(desired=desired, current=current)
summary = self._plan_summary(plan)
self._emit(
"manager.plan.done",
module="dxaws-acm",
domain_name=desired.domain_name,
region=desired.region,
action=plan.action,
duration_ms=int((time.monotonic() - start) * 1000),
plan_summary=summary,
)
return plan
def _ensure_dns(
self,
*,
validation_records: list[Any],
ttl: int,
) -> list[dict[str, str]]:
dns = self._dns(None)
changes: list[dict[str, str]] = []
for vr in validation_records:
name = getattr(vr, "name", None)
rr_type = getattr(vr, "type", None)
value = getattr(vr, "value", None)
if not name or not rr_type or not value:
continue
fqdn = str(name)
desired = DnsRecordDesired(
record_name=fqdn,
record_type=str(rr_type).upper(),
ttl=ttl,
values=[str(value)],
)
dns.execute(desired)
changes.append(
{
"name": fqdn,
"type": desired.record_type,
"value": str(value),
}
)
return changes
[docs]
def apply(self, plan: AcmPlan, *, options: ApplyOptions | None = None) -> AcmOutputs:
options = options or ApplyOptions()
if plan.is_noop and plan.current.certificate_arn and plan.current.status:
return AcmOutputs(
certificate_arn=plan.current.certificate_arn,
status=plan.current.status,
)
if plan.action == "recreate":
raise RuntimeError("certificate status is FAILED; manual intervention required")
if options.emit_events:
self._emit(
"manager.apply.start",
module="dxaws-acm",
domain_name=plan.desired.domain_name,
region=plan.desired.region,
action=plan.action,
)
provider = self._provider(plan.desired)
cert_arn = plan.certificate_arn
info = None
if plan.action == "request":
cert_arn = provider.request_dns_validated_certificate(
primary_domain=plan.desired.domain_name,
sans=[],
tags=plan.desired.tags,
)
if cert_arn:
info = provider.describe_certificate(certificate_arn=cert_arn)
validation_records = list(plan.validation_records)
# ACM often takes a short time to publish DNS validation ResourceRecords.
if not validation_records and cert_arn:
start = time.monotonic()
poll = 0
while True:
poll += 1
try:
validation_records = list(
provider.get_dns_validation_records(certificate_arn=cert_arn)
)
except Exception:
validation_records = []
# Fallback to whatever the last describe returned (best-effort)
if not validation_records and info is not None:
validation_records = list(getattr(info, "validation_records", []) or [])
if validation_records:
break
if time.monotonic() - start >= options.max_wait_seconds:
raise TimeoutError(
"Timed out waiting for ACM DNS validation records to appear"
)
# Refresh describe occasionally while we wait
if poll == 1 or poll % 3 == 0:
try:
info = provider.describe_certificate(certificate_arn=cert_arn)
except Exception:
pass
if options.emit_events and (poll == 1 or poll % 4 == 0):
self._emit(
"manager.apply.progress",
module="dxaws-acm",
domain_name=plan.desired.domain_name,
region=plan.desired.region,
action="dns_wait",
status=getattr(info, "status", None) if info else None,
poll_count=poll,
)
time.sleep(max(options.poll_interval_seconds, 0))
dns_changes: list[dict[str, str]] = []
if validation_records:
dns_changes = self._ensure_dns(
validation_records=validation_records,
ttl=options.dns_ttl,
)
status = getattr(info, "status", None) if info else None
if plan.wait and cert_arn:
start = time.monotonic()
poll = 0
while True:
info = provider.describe_certificate(certificate_arn=cert_arn)
status = getattr(info, "status", None)
poll += 1
if status == "ISSUED":
break
if time.monotonic() - start >= options.max_wait_seconds:
raise TimeoutError("ACM certificate did not reach ISSUED before timeout")
if options.emit_events and (poll == 1 or poll % 4 == 0):
self._emit(
"manager.apply.progress",
module="dxaws-acm",
domain_name=plan.desired.domain_name,
region=plan.desired.region,
action="wait",
status=status,
poll_count=poll,
)
time.sleep(max(options.poll_interval_seconds, 0))
if options.emit_events:
self._emit(
"manager.apply.done",
module="dxaws-acm",
domain_name=plan.desired.domain_name,
region=plan.desired.region,
action=plan.action,
outcome="applied",
dns_changes=dns_changes,
)
if cert_arn is None or status is None:
raise RuntimeError("ACM apply did not resolve a certificate ARN/status")
return AcmOutputs(certificate_arn=cert_arn, status=str(status))
[docs]
def execute(self, desired: AcmDesired, *, options: ExecuteOptions | None = None) -> AcmManagerResult:
options = options or ExecuteOptions()
current = self.get_current(desired)
plan = self.plan(desired, current)
summary = self._plan_summary(plan)
try:
outputs = self.apply(plan, options=options.apply_options)
outcome = "noop" if plan.is_noop else "applied"
if options.emit_events:
self._emit(
"manager.execute.done",
module="dxaws-acm",
domain_name=desired.domain_name,
region=desired.region,
action=plan.action,
outcome=outcome,
plan_summary=summary,
)
return AcmManagerResult(
desired=desired,
current=current,
plan=plan,
outputs=outputs,
outcome=outcome,
)
except Exception as exc:
if options.emit_events:
self._emit(
"manager.execute.done",
module="dxaws-acm",
domain_name=desired.domain_name,
region=desired.region,
action=plan.action,
outcome="failed",
plan_summary=summary,
error=str(exc),
)
raise