"""Manager for dxaws-cloudfront (Base Manager Pattern)."""
from __future__ import annotations
import time
from dataclasses import dataclass, field, replace
from typing import Any
from dxaws_core.o11y import O11y
from dxaws_cloudfront.constants import CLOUDFRONT_HOSTED_ZONE_ID
from dxaws_cloudfront.executor import execute
from dxaws_cloudfront.models import DistributionCurrent, DistributionDesired, DistributionResult, ManagerResult, Plan
from dxaws_cloudfront.planner import plan as build_plan
from dxaws_cloudfront.planner import resolve_oac_name
from dxaws_cloudfront.providers.base import ProviderBase
from dxaws_cloudfront.providers.aws import AwsProvider
from dxaws_core.aws_session import AwsSession
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class ApplyOptions:
max_wait_seconds: int = 1800
poll_interval_seconds: int = 30
emit_events: bool = True
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class ExecuteOptions:
apply_options: ApplyOptions | None = None
emit_events: bool = True
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class CloudFrontManager:
provider: ProviderBase | None = None
o11y: O11y = field(default_factory=O11y.noop)
def __post_init__(self) -> None:
# Allow manager to be constructed without explicit provider
if self.provider is None:
object.__setattr__(self, "provider", AwsProvider(aws=AwsSession()))
def _emit(self, event: str, **fields: Any) -> None:
self.o11y.info(event, **fields)
def _plan_action(self, plan: Plan, current: DistributionCurrent) -> str:
types = [a.type for a in plan.actions]
if any(t.value == "create_distribution" for t in types):
return "create"
if any(t.value == "update_distribution" for t in types):
return "update"
if any(t.value == "tag_distribution" for t in types):
return "update"
if any(t.value in ("disable_distribution", "delete_distribution") for t in types):
return "destroy"
if not plan.actions and current.exists and current.status and current.status != "Deployed":
return "wait"
return "noop"
[docs]
def get_current(self, desired: DistributionDesired) -> DistributionCurrent:
ref = None
aliases = list(desired.aliases or ())
if aliases and hasattr(self.provider, "find_distribution_by_alias"):
try:
ref = self.provider.find_distribution_by_alias(aliases[0])
except Exception:
ref = None
if not ref:
return DistributionCurrent(exists=False, name=desired.name)
cfg, etag = self.provider.get_distribution(ref.id)
aliases_items = ((cfg.get("Aliases") or {}).get("Items")) or []
origin_items = ((cfg.get("Origins") or {}).get("Items")) or []
origin_domain = (origin_items[0].get("DomainName") if origin_items else None)
dcb = cfg.get("DefaultCacheBehavior") or {}
vpp = dcb.get("ViewerProtocolPolicy")
vc = cfg.get("ViewerCertificate") or {}
acm_arn = vc.get("ACMCertificateArn")
using_default = (
bool(vc.get("CloudFrontDefaultCertificate"))
if "CloudFrontDefaultCertificate" in vc
else None
)
tags: dict[str, str] | None = None
if ref.arn and hasattr(self.provider, "list_tags"):
try:
tags = self.provider.list_tags(ref.arn)
except Exception:
tags = None
return DistributionCurrent(
exists=True,
name=desired.name,
id=ref.id,
arn=ref.arn,
domain_name=ref.domain_name,
status=ref.status,
enabled=cfg.get("Enabled"),
comment=cfg.get("Comment"),
aliases=tuple(aliases_items) if aliases_items else (),
origin_domain_name=origin_domain,
default_root_object=cfg.get("DefaultRootObject"),
viewer_protocol_policy=vpp,
acm_certificate_arn=acm_arn,
using_default_certificate=using_default,
price_class=cfg.get("PriceClass"),
http_version=cfg.get("HttpVersion"),
ipv6_enabled=cfg.get("IsIPV6Enabled"),
tags=tags,
raw_config=cfg,
etag=etag,
)
[docs]
def plan(self, desired: DistributionDesired, current: DistributionCurrent | None = None) -> Plan:
current = current or self.get_current(desired)
self._emit(
"manager.plan.start",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
)
# Best-effort: provide OAC id if it exists.
oac_id = None
try:
o = self.provider.find_oac_by_name(resolve_oac_name(desired))
if o:
oac_id = o.id
except Exception:
oac_id = None
p = build_plan(desired, current, oac_id=oac_id, wait_deployed=False)
action = self._plan_action(p, current)
self._emit(
"manager.plan.done",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
action=action,
)
return p
def _wait_deployed(self, dist_id: str, *, options: ApplyOptions, desired: DistributionDesired) -> None:
start = time.monotonic()
poll = 0
while True:
status = self.provider.get_distribution_status(dist_id)
poll += 1
if status == "Deployed":
return
if time.monotonic() - start >= options.max_wait_seconds:
raise TimeoutError("CloudFront distribution did not reach Deployed before timeout")
if options.emit_events and (poll == 1 or poll % 4 == 0):
self._emit(
"manager.apply.progress",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
action="wait",
status=status,
poll_count=poll,
)
time.sleep(max(options.poll_interval_seconds, 0))
[docs]
def apply(self, plan: Plan, *, options: ApplyOptions | None = None) -> DistributionResult:
options = options or ApplyOptions()
current = plan.current
action = self._plan_action(plan, current)
if options.emit_events:
self._emit(
"manager.apply.start",
module="dxaws-cloudfront",
aliases=list(plan.desired.aliases),
origin_domain_name=plan.desired.origin.domain_name or plan.desired.origin.bucket_name,
action=action,
)
result: DistributionResult | None = None
if not plan.actions:
if current.exists and current.id:
result = DistributionResult(
name=plan.desired.name,
id=current.id,
arn=current.arn,
domain_name=current.domain_name,
hosted_zone_id=CLOUDFRONT_HOSTED_ZONE_ID,
)
if current.status and current.status != "Deployed":
self._wait_deployed(current.id, options=options, desired=plan.desired)
else:
raise RuntimeError("No actions planned and distribution does not exist")
else:
exec_out = execute(self.provider, plan)
result = exec_out.result
if any(a.type.value in ("create_distribution", "update_distribution") for a in plan.actions):
self._wait_deployed(result.id, options=options, desired=plan.desired)
if options.emit_events:
self._emit(
"manager.apply.done",
module="dxaws-cloudfront",
aliases=list(plan.desired.aliases),
origin_domain_name=plan.desired.origin.domain_name or plan.desired.origin.bucket_name,
action=action,
outcome="applied",
distribution_id=result.id if result else None,
)
if result is None:
raise RuntimeError("apply did not resolve a distribution result")
return result
[docs]
def execute(self, desired: DistributionDesired, *, options: ExecuteOptions | None = None) -> ManagerResult:
options = options or ExecuteOptions()
current = self.get_current(desired)
plan = self.plan(desired, current)
action = self._plan_action(plan, current)
try:
# If no changes are required, do not run apply.
if plan.is_noop and action == "noop":
result = DistributionResult(
name=desired.name,
id=current.id or "",
arn=current.arn,
domain_name=current.domain_name,
hosted_zone_id=CLOUDFRONT_HOSTED_ZONE_ID,
)
outcome = "noop"
if options.emit_events:
self._emit(
"manager.execute.done",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
action=action,
outcome=outcome,
distribution_id=result.id,
)
return ManagerResult(
desired=desired,
current=current,
plan=plan,
result=result,
outcome=outcome,
)
# Otherwise, apply the plan.
result = self.apply(plan, options=options.apply_options)
outcome = "applied"
if options.emit_events:
self._emit(
"manager.execute.done",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
action=action,
outcome=outcome,
distribution_id=result.id,
)
return ManagerResult(
desired=desired,
current=current,
plan=plan,
result=result,
outcome=outcome,
)
except Exception as exc:
if options.emit_events:
self._emit(
"manager.execute.done",
module="dxaws-cloudfront",
aliases=list(desired.aliases),
origin_domain_name=desired.origin.domain_name or desired.origin.bucket_name,
action=action,
outcome="failed",
error=str(exc),
)
raise
[docs]
def destroy(self, desired: DistributionDesired, *, options: ExecuteOptions | None = None) -> ManagerResult:
"""Destroy a distribution by converging to desired.present=False.
Pass the same `desired` used for creation so aliases can locate the
distribution deterministically.
"""
absent = replace(desired, present=False)
return self.execute(absent, options=options)