Source code for dxaws_cloudfront.executor

from __future__ import annotations

"""Executor for dxaws-cloudfront.

Takes a Plan and applies actions using the provider.

This file should stay boring:
- no planning logic
- no AWS discovery logic beyond what is necessary to execute actions
"""

from dataclasses import dataclass
from typing import Any

from .constants import CLOUDFRONT_HOSTED_ZONE_ID
from .models import ActionType, DistributionResult, Plan
from .planner import resolve_oac_name, resolve_origin_domain, synth_distribution_config, stable_comment


[docs] @dataclass(frozen=True) class ExecuteResult: """Return type for executor.execute().""" result: DistributionResult oac_id: str | None = None
[docs] def execute(provider, p: Plan) -> ExecuteResult: """Execute a plan and return outputs.""" desired = p.desired current = p.current # Track the distribution identifiers across actions. dist_id: str | None = current.id dist_arn: str | None = current.arn dist_domain: str | None = current.domain_name # OAC id can be created during execution and then used for distribution config. oac_id: str | None = None # If the plan includes CREATE_OAC, create/ensure it now and keep the id. for a in p.actions: if a.type == ActionType.CREATE_OAC: oac_name = a.payload.get("oac_name") or resolve_oac_name(desired) desc = a.payload.get("description") or f"dxaws-cloudfront OAC for {desired.name}" oac = provider.ensure_oac(name=oac_name, description=desc) oac_id = oac.id break # If we didn't plan CREATE_OAC but we still might need an OAC id for config, # we can try to find it by name (best-effort). if oac_id is None: try: o = provider.find_oac_by_name(resolve_oac_name(desired)) if o: oac_id = o.id except Exception: oac_id = None # Apply create/update/tag/wait actions in order. for a in p.actions: if a.type == ActionType.CREATE_OAC: # already handled above continue if a.type == ActionType.CREATE_DISTRIBUTION: cfg = a.payload.get("distribution_config") if not cfg: origin_domain = resolve_origin_domain(desired.origin) cfg = synth_distribution_config(desired, origin_domain=origin_domain, oac_id=oac_id) # Ensure comment is stable (idempotent discovery) cfg["Comment"] = cfg.get("Comment") or (desired.comment or stable_comment(desired.name)) tags = a.payload.get("tags") or desired.tags or None ref = provider.create_distribution(config=cfg, tags=tags) dist_id = ref.id dist_arn = ref.arn dist_domain = ref.domain_name elif a.type == ActionType.UPDATE_DISTRIBUTION: if not dist_id: raise RuntimeError("update_distribution planned but no distribution id is known") cfg = a.payload.get("distribution_config") if not cfg: origin_domain = resolve_origin_domain(desired.origin) cfg = synth_distribution_config(desired, origin_domain=origin_domain, oac_id=oac_id) # CloudFront updates require IfMatch from get_distribution_config _, etag = provider.get_distribution(dist_id) ref = provider.update_distribution(dist_id=dist_id, if_match=etag, config=cfg) dist_arn = ref.arn or dist_arn dist_domain = ref.domain_name or dist_domain elif a.type == ActionType.DISABLE_DISTRIBUTION: if not dist_id: raise RuntimeError("disable_distribution planned but no distribution id is known") provider.disable_distribution(dist_id) elif a.type == ActionType.DELETE_DISTRIBUTION: if not dist_id: raise RuntimeError("delete_distribution planned but no distribution id is known") provider.delete_distribution(dist_id) dist_id = None dist_arn = None dist_domain = None elif a.type == ActionType.TAG_DISTRIBUTION: if not dist_arn: # If we don't have the arn yet, try to reconstruct it from dist_id # (CloudFront ARNs are returned on create; on update, also returned). # We'll fall back to querying a list if needed later. raise RuntimeError("tag_distribution planned but distribution ARN is unknown") tags = a.payload.get("tags") or desired.tags if tags: provider.tag_resource(dist_arn, tags) elif a.type == ActionType.WAIT_DEPLOYED: if not dist_id: raise RuntimeError("wait_deployed planned but no distribution id is known") provider.wait_deployed(dist_id) else: raise RuntimeError(f"Unsupported action type: {a.type}") if desired.present and not dist_id: raise RuntimeError("Execution completed without a distribution id") # CloudFront hosted zone id is a well-known constant, but we keep it explicit. # If you prefer, we can populate it later from docs/constants. return ExecuteResult( result=DistributionResult( name=desired.name, id=dist_id or "", arn=dist_arn, domain_name=dist_domain, hosted_zone_id=CLOUDFRONT_HOSTED_ZONE_ID, ), oac_id=oac_id, )