Source code for dxaws_cloudfront.manager

"""Manager for dxaws-cloudfront (Base Manager Pattern)."""

from __future__ import annotations

import time
from dataclasses import dataclass, field, replace
from typing import Any

from dxaws_core.o11y import O11y

from dxaws_cloudfront.constants import CLOUDFRONT_HOSTED_ZONE_ID
from dxaws_cloudfront.executor import execute
from dxaws_cloudfront.models import DistributionCurrent, DistributionDesired, DistributionResult, ManagerResult, Plan
from dxaws_cloudfront.planner import plan as build_plan
from dxaws_cloudfront.planner import resolve_oac_name
from dxaws_cloudfront.providers.base import ProviderBase
from dxaws_cloudfront.providers.aws import AwsProvider
from dxaws_core.aws_session import AwsSession


[docs] @dataclass(frozen=True, slots=True, kw_only=True) class ApplyOptions: max_wait_seconds: int = 1800 poll_interval_seconds: int = 30 emit_events: bool = True
[docs] @dataclass(frozen=True, slots=True, kw_only=True) class ExecuteOptions: apply_options: ApplyOptions | None = None emit_events: bool = True
[docs] @dataclass(frozen=True, slots=True, kw_only=True) class CloudFrontManager: provider: ProviderBase | None = None o11y: O11y = field(default_factory=O11y.noop) def __post_init__(self) -> None: # Allow manager to be constructed without explicit provider if self.provider is None: object.__setattr__(self, "provider", AwsProvider(aws=AwsSession())) def _emit(self, event: str, **fields: Any) -> None: self.o11y.info(event, **fields) def _plan_action(self, plan: Plan, current: DistributionCurrent) -> str: types = [a.type for a in plan.actions] if any(t.value == "create_distribution" for t in types): return "create" if any(t.value == "update_distribution" for t in types): return "update" if any(t.value == "tag_distribution" for t in types): return "update" if any(t.value in ("disable_distribution", "delete_distribution") for t in types): return "destroy" if not plan.actions and current.exists and current.status and current.status != "Deployed": return "wait" return "noop"
[docs] def get_current(self, desired: DistributionDesired) -> DistributionCurrent: ref = None aliases = list(desired.aliases or ()) if aliases and hasattr(self.provider, "find_distribution_by_alias"): try: ref = self.provider.find_distribution_by_alias(aliases[0]) except Exception: ref = None if not ref: return DistributionCurrent(exists=False, name=desired.name) cfg, etag = self.provider.get_distribution(ref.id) aliases_items = ((cfg.get("Aliases") or {}).get("Items")) or [] origin_items = ((cfg.get("Origins") or {}).get("Items")) or [] origin_domain = (origin_items[0].get("DomainName") if origin_items else None) dcb = cfg.get("DefaultCacheBehavior") or {} vpp = dcb.get("ViewerProtocolPolicy") vc = cfg.get("ViewerCertificate") or {} acm_arn = vc.get("ACMCertificateArn") using_default = ( bool(vc.get("CloudFrontDefaultCertificate")) if "CloudFrontDefaultCertificate" in vc else None ) tags: dict[str, str] | None = None if ref.arn and hasattr(self.provider, "list_tags"): try: tags = self.provider.list_tags(ref.arn) except Exception: tags = None return DistributionCurrent( exists=True, name=desired.name, id=ref.id, arn=ref.arn, domain_name=ref.domain_name, status=ref.status, enabled=cfg.get("Enabled"), comment=cfg.get("Comment"), aliases=tuple(aliases_items) if aliases_items else (), origin_domain_name=origin_domain, default_root_object=cfg.get("DefaultRootObject"), viewer_protocol_policy=vpp, acm_certificate_arn=acm_arn, using_default_certificate=using_default, price_class=cfg.get("PriceClass"), http_version=cfg.get("HttpVersion"), ipv6_enabled=cfg.get("IsIPV6Enabled"), tags=tags, raw_config=cfg, etag=etag, )
[docs] def plan(self, desired: DistributionDesired, current: DistributionCurrent | None = None) -> Plan: current = current or self.get_current(desired) self._emit( "manager.plan.start", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, ) # Best-effort: provide OAC id if it exists. oac_id = None try: o = self.provider.find_oac_by_name(resolve_oac_name(desired)) if o: oac_id = o.id except Exception: oac_id = None p = build_plan(desired, current, oac_id=oac_id, wait_deployed=False) action = self._plan_action(p, current) self._emit( "manager.plan.done", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, action=action, ) return p
def _wait_deployed(self, dist_id: str, *, options: ApplyOptions, desired: DistributionDesired) -> None: start = time.monotonic() poll = 0 while True: status = self.provider.get_distribution_status(dist_id) poll += 1 if status == "Deployed": return if time.monotonic() - start >= options.max_wait_seconds: raise TimeoutError("CloudFront distribution did not reach Deployed before timeout") if options.emit_events and (poll == 1 or poll % 4 == 0): self._emit( "manager.apply.progress", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, action="wait", status=status, poll_count=poll, ) time.sleep(max(options.poll_interval_seconds, 0))
[docs] def apply(self, plan: Plan, *, options: ApplyOptions | None = None) -> DistributionResult: options = options or ApplyOptions() current = plan.current action = self._plan_action(plan, current) if options.emit_events: self._emit( "manager.apply.start", module="dxaws-cloudfront", aliases=list(plan.desired.aliases), origin_domain_name=plan.desired.origin.domain_name or plan.desired.origin.bucket_name, action=action, ) result: DistributionResult | None = None if not plan.actions: if current.exists and current.id: result = DistributionResult( name=plan.desired.name, id=current.id, arn=current.arn, domain_name=current.domain_name, hosted_zone_id=CLOUDFRONT_HOSTED_ZONE_ID, ) if current.status and current.status != "Deployed": self._wait_deployed(current.id, options=options, desired=plan.desired) else: raise RuntimeError("No actions planned and distribution does not exist") else: exec_out = execute(self.provider, plan) result = exec_out.result if any(a.type.value in ("create_distribution", "update_distribution") for a in plan.actions): self._wait_deployed(result.id, options=options, desired=plan.desired) if options.emit_events: self._emit( "manager.apply.done", module="dxaws-cloudfront", aliases=list(plan.desired.aliases), origin_domain_name=plan.desired.origin.domain_name or plan.desired.origin.bucket_name, action=action, outcome="applied", distribution_id=result.id if result else None, ) if result is None: raise RuntimeError("apply did not resolve a distribution result") return result
[docs] def execute(self, desired: DistributionDesired, *, options: ExecuteOptions | None = None) -> ManagerResult: options = options or ExecuteOptions() current = self.get_current(desired) plan = self.plan(desired, current) action = self._plan_action(plan, current) try: # If no changes are required, do not run apply. if plan.is_noop and action == "noop": result = DistributionResult( name=desired.name, id=current.id or "", arn=current.arn, domain_name=current.domain_name, hosted_zone_id=CLOUDFRONT_HOSTED_ZONE_ID, ) outcome = "noop" if options.emit_events: self._emit( "manager.execute.done", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, action=action, outcome=outcome, distribution_id=result.id, ) return ManagerResult( desired=desired, current=current, plan=plan, result=result, outcome=outcome, ) # Otherwise, apply the plan. result = self.apply(plan, options=options.apply_options) outcome = "applied" if options.emit_events: self._emit( "manager.execute.done", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, action=action, outcome=outcome, distribution_id=result.id, ) return ManagerResult( desired=desired, current=current, plan=plan, result=result, outcome=outcome, ) except Exception as exc: if options.emit_events: self._emit( "manager.execute.done", module="dxaws-cloudfront", aliases=list(desired.aliases), origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name, action=action, outcome="failed", error=str(exc), ) raise
[docs] def destroy(self, desired: DistributionDesired, *, options: ExecuteOptions | None = None) -> ManagerResult: """Destroy a distribution by converging to desired.present=False. Pass the same `desired` used for creation so aliases can locate the distribution deterministically. """ absent = replace(desired, present=False) return self.execute(absent, options=options)