from __future__ import annotations
from dataclasses import dataclass
from .config import DnsRecordSpec
from .providers.aws import RecordSetInfo, Route53Provider, ZoneInfo
from .types import RecordPlan, StepAction, ZonePlan, ZoneVisibility
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class ZoneApplyResult:
zones: dict[str, ZoneInfo] # canonical zone -> info
created: list[str] # list of zone names created
noop: list[str] # list of zone names that were already present
[docs]
@dataclass(frozen=True, slots=True, kw_only=True)
class RecordApplyResult:
records: dict[tuple[str, str], RecordSetInfo] # (fqdn, type) -> info
upserted: list[tuple[str, str]] # record keys changed/created
noop: list[tuple[str, str]] # record keys already matching
[docs]
def apply_zones(*, provider: Route53Provider, plan: ZonePlan) -> ZoneApplyResult:
zones: dict[str, ZoneInfo] = {}
created: list[str] = []
noop: list[str] = []
for step in plan.steps:
if step.action == StepAction.NOOP:
# Ensure deterministic outputs: re-read the zone info if not already captured.
hz = plan.observed.get(step.zone_name) or provider.get_zone_by_name(step.zone_name)
if hz is None:
# Drift between plan and apply; DC should treat this as an error.
raise RuntimeError(f"Zone '{step.zone_name}' disappeared between plan and apply")
zones[step.zone_name] = hz
noop.append(step.zone_name)
continue
if step.action == StepAction.CREATE_PUBLIC_ZONE:
hz = provider.create_public_zone(step.zone_name)
zones[step.zone_name] = hz
created.append(step.zone_name)
continue
raise RuntimeError(f"Unsupported step action: {step.action}")
return ZoneApplyResult(zones=zones, created=created, noop=noop)
[docs]
def apply_records(*, provider: Route53Provider, plan: RecordPlan) -> RecordApplyResult:
"""Apply a record convergence plan.
Supports:
- Non-alias records: UPSERT with TTL + values
- Alias records: UPSERT with AliasTarget (dns_name + hosted_zone_id)
The planner controls what steps exist; executor simply performs them.
"""
records: dict[tuple[str, str], RecordSetInfo] = {}
upserted: list[tuple[str, str]] = []
noop: list[tuple[str, str]] = []
# Build an index of desired specs by key so executor can retrieve ttl/values.
desired_by_key: dict[tuple[str, str], DnsRecordSpec] = {}
for spec in plan.desired:
key = (spec.fqdn().rstrip(".") + ".", spec.type.upper())
if key in desired_by_key:
raise ValueError(f"Duplicate desired record spec for {key}")
desired_by_key[key] = spec
# Cache zone name -> id to avoid repeated API calls.
zone_id_cache: dict[str, str] = {}
for step in plan.steps:
key = (step.fqdn.rstrip(".") + ".", step.record_type.upper())
# Resolve hosted zone id.
zone_name = step.zone_name.rstrip(".") + "."
if zone_name in zone_id_cache:
zone_id = zone_id_cache[zone_name]
else:
hz = provider.get_zone_by_name(zone_name)
if hz is None:
raise RuntimeError(f"Zone '{zone_name}' disappeared between plan and apply")
if hz.visibility == ZoneVisibility.PRIVATE:
raise RuntimeError(f"Zone '{zone_name}' is private; refusing to apply public DNS records")
zone_id = hz.zone_id
zone_id_cache[zone_name] = zone_id
if step.action == StepAction.NOOP_RECORD:
# Deterministic outputs: use observed if present, otherwise re-read.
rr = plan.observed.get(key) or provider.get_record_set(
zone_id=zone_id,
fqdn=key[0],
record_type=key[1],
)
if rr is None:
raise RuntimeError(
f"Record {key[0]} {key[1]} disappeared between plan and apply"
)
records[key] = rr
noop.append(key)
continue
if step.action == StepAction.UPSERT_RECORD:
spec = desired_by_key.get(key)
if spec is None:
raise RuntimeError(f"Missing desired spec for record {key}")
alias = getattr(spec, "alias", None) or getattr(spec, "alias_target", None)
# Alias record UPSERT
if alias is not None:
# Require provider support.
if not hasattr(provider, "upsert_alias_record_set"):
raise RuntimeError(
"Route53Provider does not support alias UPSERT yet (missing upsert_alias_record_set)"
)
if isinstance(alias, dict):
dns_name = alias.get("dns_name") or alias.get("DNSName")
hosted_zone_id = alias.get("hosted_zone_id") or alias.get("HostedZoneId")
eth = alias.get("evaluate_target_health") or alias.get("EvaluateTargetHealth")
else:
dns_name = getattr(alias, "dns_name", None) or getattr(alias, "DNSName", None)
hosted_zone_id = getattr(alias, "hosted_zone_id", None) or getattr(alias, "HostedZoneId", None)
eth = getattr(alias, "evaluate_target_health", None) or getattr(alias, "EvaluateTargetHealth", None)
# dns_name = getattr(alias, "dns_name", None) or getattr( alias, "DNSName", None) or
# alias.get("dns_name") if isinstance(alias, dict) else None
# hosted_zone_id = getattr(alias, "hosted_zone_id", None) or getattr(alias, "HostedZoneId", None) or
# alias.get("hosted_zone_id") if isinstance(alias, dict) else None
# eth = getattr(alias, "evaluate_target_health", None) if not isinstance(alias, dict) else
# alias.get("evaluate_target_health")
if not dns_name or not hosted_zone_id:
raise RuntimeError(f"Alias record {key} requires dns_name and hosted_zone_id")
provider.upsert_alias_record_set(
zone_id=zone_id,
fqdn=key[0],
record_type=key[1],
dns_name=str(dns_name).rstrip(".") + ".",
hosted_zone_id=str(hosted_zone_id),
evaluate_target_health=bool(eth) if eth is not None else False,
)
# Non-alias record UPSERT
else:
if spec.ttl is None:
raise RuntimeError(f"TTL is required for non-alias record {key}")
provider.upsert_record_set(
zone_id=zone_id,
fqdn=key[0],
record_type=key[1],
ttl=spec.ttl,
values=list(spec.values or []),
)
# Re-read to return deterministic outputs.
rr2 = provider.get_record_set(
zone_id=zone_id,
fqdn=key[0],
record_type=key[1],
)
if rr2 is None:
raise RuntimeError(f"Record {key[0]} {key[1]} was not visible after UPSERT")
records[key] = rr2
upserted.append(key)
continue
raise RuntimeError(f"Unsupported step action: {step.action}")
return RecordApplyResult(records=records, upserted=upserted, noop=noop)