Source code for dxaws_cloudfront.module

from __future__ import annotations

from dataclasses import dataclass

from .executor import execute
from .models import (
    DistributionCurrent,
    DistributionDesired,
    DistributionResult,
    Plan,
)
from .planner import plan as build_plan
from .planner import resolve_oac_name
from .providers.aws import AwsProvider


[docs] @dataclass(frozen=True) class ModulePlanResult: desired: DistributionDesired current: DistributionCurrent plan: Plan
[docs] @dataclass(frozen=True) class ModuleApplyResult: desired: DistributionDesired current: DistributionCurrent plan: Plan result: DistributionResult
[docs] class Module: """dxaws-cloudfront convergence module. Note: This is NOT a DxAwsInfraModuleBase module. It does not deploy stacks. It converges a CloudFront distribution directly via AWS APIs. """ def __init__(self, *, provider: AwsProvider) -> None: self.provider = provider # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def get_current(self, desired: DistributionDesired) -> DistributionCurrent: """Discover current state for a distribution based on deterministic identity. MVP strategy: - find by primary alias (first entry in desired.aliases) - if found, fetch config + etag and normalize into DistributionCurrent - best-effort tags if ARN is available Note: comment-based discovery is intentionally not supported. """ primary_alias = (desired.aliases[0] if getattr(desired, "aliases", None) else None) or desired.name ref = self.provider.find_distribution_by_alias(primary_alias) if not ref: return DistributionCurrent(exists=False, name=desired.name) cfg, etag = self.provider.get_distribution(ref.id) # Normalize fields we care about for planner comparisons. aliases_items = ((cfg.get("Aliases") or {}).get("Items")) or [] origin_items = ((cfg.get("Origins") or {}).get("Items")) or [] origin_domain = (origin_items[0].get("DomainName") if origin_items else None) dcb = cfg.get("DefaultCacheBehavior") or {} vpp = dcb.get("ViewerProtocolPolicy") vc = cfg.get("ViewerCertificate") or {} acm_arn = vc.get("ACMCertificateArn") using_default = bool(vc.get("CloudFrontDefaultCertificate")) if "CloudFrontDefaultCertificate" in vc else None tags: dict[str, str] | None = None if ref.arn: try: tags = self.provider.list_tags(ref.arn) except Exception: tags = None return DistributionCurrent( exists=True, name=desired.name, id=ref.id, arn=ref.arn, domain_name=ref.domain_name, status=ref.status, enabled=cfg.get("Enabled"), comment=cfg.get("Comment"), aliases=tuple(aliases_items) if aliases_items else (), origin_domain_name=origin_domain, default_root_object=cfg.get("DefaultRootObject"), viewer_protocol_policy=vpp, acm_certificate_arn=acm_arn, using_default_certificate=using_default, price_class=cfg.get("PriceClass"), http_version=cfg.get("HttpVersion"), ipv6_enabled=cfg.get("IsIPV6Enabled"), tags=tags, raw_config=cfg, etag=etag, )
[docs] def plan(self, desired: DistributionDesired, *, wait_deployed: bool = True) -> ModulePlanResult: cur = self.get_current(desired) # Best-effort: if OAC already exists, provide its id to the planner so it can # synth configs referencing it. If missing, planner will include CREATE_OAC. oac_id = None try: o = self.provider.find_oac_by_name(resolve_oac_name(desired)) if o: oac_id = o.id except Exception: oac_id = None p = build_plan(desired, cur, oac_id=oac_id, wait_deployed=wait_deployed) return ModulePlanResult(desired=desired, current=cur, plan=p)
[docs] def apply(self, desired: DistributionDesired, *, wait_deployed: bool = True) -> ModuleApplyResult: pr = self.plan(desired, wait_deployed=wait_deployed) exec_out = execute(self.provider, pr.plan) return ModuleApplyResult( desired=desired, current=pr.current, plan=pr.plan, result=exec_out.result, )
# Back-compat / nicer naming in callers/tests CloudFrontModule = Module