Source code for dxaws_dns.executor

from __future__ import annotations

from dataclasses import dataclass

from .config import DnsRecordSpec
from .providers.aws import RecordSetInfo, Route53Provider, ZoneInfo
from .types import RecordPlan, StepAction, ZonePlan, ZoneVisibility


[docs] @dataclass(frozen=True, slots=True, kw_only=True) class ZoneApplyResult: zones: dict[str, ZoneInfo] # canonical zone -> info created: list[str] # list of zone names created noop: list[str] # list of zone names that were already present
[docs] @dataclass(frozen=True, slots=True, kw_only=True) class RecordApplyResult: records: dict[tuple[str, str], RecordSetInfo] # (fqdn, type) -> info upserted: list[tuple[str, str]] # record keys changed/created noop: list[tuple[str, str]] # record keys already matching
[docs] def apply_zones(*, provider: Route53Provider, plan: ZonePlan) -> ZoneApplyResult: zones: dict[str, ZoneInfo] = {} created: list[str] = [] noop: list[str] = [] for step in plan.steps: if step.action == StepAction.NOOP: # Ensure deterministic outputs: re-read the zone info if not already captured. hz = plan.observed.get(step.zone_name) or provider.get_zone_by_name(step.zone_name) if hz is None: # Drift between plan and apply; DC should treat this as an error. raise RuntimeError(f"Zone '{step.zone_name}' disappeared between plan and apply") zones[step.zone_name] = hz noop.append(step.zone_name) continue if step.action == StepAction.CREATE_PUBLIC_ZONE: hz = provider.create_public_zone(step.zone_name) zones[step.zone_name] = hz created.append(step.zone_name) continue raise RuntimeError(f"Unsupported step action: {step.action}") return ZoneApplyResult(zones=zones, created=created, noop=noop)
[docs] def apply_records(*, provider: Route53Provider, plan: RecordPlan) -> RecordApplyResult: """Apply a record convergence plan. Supports: - Non-alias records: UPSERT with TTL + values - Alias records: UPSERT with AliasTarget (dns_name + hosted_zone_id) The planner controls what steps exist; executor simply performs them. """ records: dict[tuple[str, str], RecordSetInfo] = {} upserted: list[tuple[str, str]] = [] noop: list[tuple[str, str]] = [] # Build an index of desired specs by key so executor can retrieve ttl/values. desired_by_key: dict[tuple[str, str], DnsRecordSpec] = {} for spec in plan.desired: key = (spec.fqdn().rstrip(".") + ".", spec.type.upper()) if key in desired_by_key: raise ValueError(f"Duplicate desired record spec for {key}") desired_by_key[key] = spec # Cache zone name -> id to avoid repeated API calls. zone_id_cache: dict[str, str] = {} for step in plan.steps: key = (step.fqdn.rstrip(".") + ".", step.record_type.upper()) # Resolve hosted zone id. zone_name = step.zone_name.rstrip(".") + "." if zone_name in zone_id_cache: zone_id = zone_id_cache[zone_name] else: hz = provider.get_zone_by_name(zone_name) if hz is None: raise RuntimeError(f"Zone '{zone_name}' disappeared between plan and apply") if hz.visibility == ZoneVisibility.PRIVATE: raise RuntimeError(f"Zone '{zone_name}' is private; refusing to apply public DNS records") zone_id = hz.zone_id zone_id_cache[zone_name] = zone_id if step.action == StepAction.NOOP_RECORD: # Deterministic outputs: use observed if present, otherwise re-read. rr = plan.observed.get(key) or provider.get_record_set( zone_id=zone_id, fqdn=key[0], record_type=key[1], ) if rr is None: raise RuntimeError( f"Record {key[0]} {key[1]} disappeared between plan and apply" ) records[key] = rr noop.append(key) continue if step.action == StepAction.UPSERT_RECORD: spec = desired_by_key.get(key) if spec is None: raise RuntimeError(f"Missing desired spec for record {key}") alias = getattr(spec, "alias", None) or getattr(spec, "alias_target", None) # Alias record UPSERT if alias is not None: # Require provider support. if not hasattr(provider, "upsert_alias_record_set"): raise RuntimeError( "Route53Provider does not support alias UPSERT yet (missing upsert_alias_record_set)" ) if isinstance(alias, dict): dns_name = alias.get("dns_name") or alias.get("DNSName") hosted_zone_id = alias.get("hosted_zone_id") or alias.get("HostedZoneId") eth = alias.get("evaluate_target_health") or alias.get("EvaluateTargetHealth") else: dns_name = getattr(alias, "dns_name", None) or getattr(alias, "DNSName", None) hosted_zone_id = getattr(alias, "hosted_zone_id", None) or getattr(alias, "HostedZoneId", None) eth = getattr(alias, "evaluate_target_health", None) or getattr(alias, "EvaluateTargetHealth", None) # dns_name = getattr(alias, "dns_name", None) or getattr( alias, "DNSName", None) or # alias.get("dns_name") if isinstance(alias, dict) else None # hosted_zone_id = getattr(alias, "hosted_zone_id", None) or getattr(alias, "HostedZoneId", None) or # alias.get("hosted_zone_id") if isinstance(alias, dict) else None # eth = getattr(alias, "evaluate_target_health", None) if not isinstance(alias, dict) else # alias.get("evaluate_target_health") if not dns_name or not hosted_zone_id: raise RuntimeError(f"Alias record {key} requires dns_name and hosted_zone_id") provider.upsert_alias_record_set( zone_id=zone_id, fqdn=key[0], record_type=key[1], dns_name=str(dns_name).rstrip(".") + ".", hosted_zone_id=str(hosted_zone_id), evaluate_target_health=bool(eth) if eth is not None else False, ) # Non-alias record UPSERT else: if spec.ttl is None: raise RuntimeError(f"TTL is required for non-alias record {key}") provider.upsert_record_set( zone_id=zone_id, fqdn=key[0], record_type=key[1], ttl=spec.ttl, values=list(spec.values or []), ) # Re-read to return deterministic outputs. rr2 = provider.get_record_set( zone_id=zone_id, fqdn=key[0], record_type=key[1], ) if rr2 is None: raise RuntimeError(f"Record {key[0]} {key[1]} was not visible after UPSERT") records[key] = rr2 upserted.append(key) continue raise RuntimeError(f"Unsupported step action: {step.action}") return RecordApplyResult(records=records, upserted=upserted, noop=noop)