from __future__ import annotations
from dataclasses import dataclass
from .executor import execute
from .models import (
DistributionCurrent,
DistributionDesired,
DistributionResult,
Plan,
)
from .planner import plan as build_plan
from .planner import resolve_oac_name
from .providers.aws import AwsProvider
[docs]
@dataclass(frozen=True)
class ModulePlanResult:
desired: DistributionDesired
current: DistributionCurrent
plan: Plan
[docs]
@dataclass(frozen=True)
class ModuleApplyResult:
desired: DistributionDesired
current: DistributionCurrent
plan: Plan
result: DistributionResult
[docs]
class Module:
"""dxaws-cloudfront convergence module.
Note: This is NOT a DxAwsInfraModuleBase module. It does not deploy stacks.
It converges a CloudFront distribution directly via AWS APIs.
"""
def __init__(self, *, provider: AwsProvider) -> None:
self.provider = provider
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def get_current(self, desired: DistributionDesired) -> DistributionCurrent:
"""Discover current state for a distribution based on deterministic identity.
MVP strategy:
- find by primary alias (first entry in desired.aliases)
- if found, fetch config + etag and normalize into DistributionCurrent
- best-effort tags if ARN is available
Note: comment-based discovery is intentionally not supported.
"""
primary_alias = (desired.aliases[0] if getattr(desired, "aliases", None) else None) or desired.name
ref = self.provider.find_distribution_by_alias(primary_alias)
if not ref:
return DistributionCurrent(exists=False, name=desired.name)
cfg, etag = self.provider.get_distribution(ref.id)
# Normalize fields we care about for planner comparisons.
aliases_items = ((cfg.get("Aliases") or {}).get("Items")) or []
origin_items = ((cfg.get("Origins") or {}).get("Items")) or []
origin_domain = (origin_items[0].get("DomainName") if origin_items else None)
dcb = cfg.get("DefaultCacheBehavior") or {}
vpp = dcb.get("ViewerProtocolPolicy")
vc = cfg.get("ViewerCertificate") or {}
acm_arn = vc.get("ACMCertificateArn")
using_default = bool(vc.get("CloudFrontDefaultCertificate")) if "CloudFrontDefaultCertificate" in vc else None
tags: dict[str, str] | None = None
if ref.arn:
try:
tags = self.provider.list_tags(ref.arn)
except Exception:
tags = None
return DistributionCurrent(
exists=True,
name=desired.name,
id=ref.id,
arn=ref.arn,
domain_name=ref.domain_name,
status=ref.status,
enabled=cfg.get("Enabled"),
comment=cfg.get("Comment"),
aliases=tuple(aliases_items) if aliases_items else (),
origin_domain_name=origin_domain,
default_root_object=cfg.get("DefaultRootObject"),
viewer_protocol_policy=vpp,
acm_certificate_arn=acm_arn,
using_default_certificate=using_default,
price_class=cfg.get("PriceClass"),
http_version=cfg.get("HttpVersion"),
ipv6_enabled=cfg.get("IsIPV6Enabled"),
tags=tags,
raw_config=cfg,
etag=etag,
)
[docs]
def plan(self, desired: DistributionDesired, *, wait_deployed: bool = True) -> ModulePlanResult:
cur = self.get_current(desired)
# Best-effort: if OAC already exists, provide its id to the planner so it can
# synth configs referencing it. If missing, planner will include CREATE_OAC.
oac_id = None
try:
o = self.provider.find_oac_by_name(resolve_oac_name(desired))
if o:
oac_id = o.id
except Exception:
oac_id = None
p = build_plan(desired, cur, oac_id=oac_id, wait_deployed=wait_deployed)
return ModulePlanResult(desired=desired, current=cur, plan=p)
[docs]
def apply(self, desired: DistributionDesired, *, wait_deployed: bool = True) -> ModuleApplyResult:
pr = self.plan(desired, wait_deployed=wait_deployed)
exec_out = execute(self.provider, pr.plan)
return ModuleApplyResult(
desired=desired,
current=pr.current,
plan=pr.plan,
result=exec_out.result,
)
# Back-compat / nicer naming in callers/tests
CloudFrontModule = Module