Source code for dxaws_s3.manager

"""The Manager - Public module facade for dxaws-s3."""

from __future__ import annotations
import uuid
from importlib import metadata
from typing import Any

from dataclasses import dataclass, field
import time
from dxaws_core.o11y import O11y

from dxaws_s3.executor import execute
from dxaws_s3.models import S3BucketDesired, S3BucketPlan, S3BucketResult, S3BucketCurrent
from dxaws_s3.planner import PlanOptions, plan
from dxaws_s3.providers.base import S3Provider


[docs] @dataclass(frozen=True) class ApplyOptions: emit_legacy_events: bool = True
[docs] @dataclass(frozen=True) class ExecuteOptions: plan_options: PlanOptions | None = None apply_options: ApplyOptions | None = None emit_legacy_events: bool = True
[docs] @dataclass(frozen=True) class S3ManagerResult: desired: S3BucketDesired current: S3BucketCurrent plan: S3BucketPlan result: S3BucketResult | None outcome: str
[docs] @dataclass(frozen=True) class S3Manager: provider: S3Provider plan_options: PlanOptions = field(default_factory=PlanOptions) o11y: O11y = field(default_factory=O11y.noop) def _module_version(self) -> str: try: return metadata.version("dxaws-s3") except Exception: return "unknown" def _emit(self, event: str, **fields: Any) -> None: # Keep a single emission point so we can normalize field shape later. self.o11y.info(event, **fields) def _plan_summary(self, p: S3BucketPlan) -> dict[str, Any]: return { "noop": p.is_noop, "operation_count": len(p.operations), "operations": [op.op_type.value for op in p.operations], "types": [op.type.value for op in p.operations], }
[docs] def get_current(self, desired: S3BucketDesired) -> S3BucketCurrent: return self.provider.get_current(desired.name)
[docs] def plan( self, desired: S3BucketDesired, current: S3BucketCurrent | None = None, *, options: PlanOptions | None = None, ) -> S3BucketPlan: plan_id = uuid.uuid4().hex current = current or self.get_current(desired) options = options or self.plan_options target = f"s3:bucket:{desired.name}" module_version = self._module_version() start = time.monotonic() self._emit("manager.plan.start", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=desired.name) self._emit("s3.plan.start", bucket=desired.name) self._emit("plan_started", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target) p = plan(desired, current, options=options) summary = self._plan_summary(p) self._emit("s3.plan.done", bucket=desired.name, operations=summary["operations"]) self._emit("plan_completed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, ops=summary["operations"], types=summary["types"]) self._emit( "manager.plan.done", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=desired.name, duration_ms=int((time.monotonic() - start) * 1000), plan_summary=summary, ) merged_metadata = dict(p.metadata) if p.metadata else {} merged_metadata["plan_id"] = plan_id return S3BucketPlan( desired=p.desired, current=p.current, operations=p.operations, metadata=merged_metadata, )
[docs] def apply( self, desired: S3BucketDesired, *, options: ApplyOptions | None = None, ) -> S3BucketResult: p = self.plan(desired) return self.apply_plan(p, options=options)
[docs] def apply_plan( self, p: S3BucketPlan, *, options: ApplyOptions | None = None, ) -> S3BucketResult: options = options or ApplyOptions() bucket = p.desired.name target = f"s3:bucket:{bucket}" plan_id = p.metadata.get("plan_id", uuid.uuid4().hex) if p.metadata else uuid.uuid4().hex module_version = self._module_version() summary = self._plan_summary(p) if options.emit_legacy_events: self._emit("s3.apply.start", bucket=bucket) self._emit( "plan_started", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, ) if p.is_noop: if options.emit_legacy_events: self._emit("s3.apply.noop", bucket=bucket) self._emit( "plan_completed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, ops=[], types=[], ) return self.provider.get_outputs(bucket) try: self._emit( "manager.apply.start", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=bucket, ) apply_start = time.monotonic() def _on_start(idx, op): self._emit( "operation_started", module="dxaws-s3", module_version=module_version, plan_id=plan_id, op_index=idx, op_type=op.op_type.value, target=op.target, reason=op.reason, ) def _on_done(idx, op): self._emit( "operation_completed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, op_index=idx, op_type=op.op_type.value, target=op.target, reason=op.reason, ) result = execute( p, self.provider, on_op_start=_on_start, on_op_done=_on_done, ) if options.emit_legacy_events: self._emit( "s3.apply.done", bucket=bucket, operations=[op.op_type.value for op in p.operations], ) self._emit( "plan_completed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, ops=[op.op_type.value for op in p.operations], types=[op.type.value for op in p.operations], ) self._emit( "manager.apply.done", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=bucket, outcome="applied", duration_ms=int((time.monotonic() - apply_start) * 1000), plan_summary=summary, ) return result except Exception as e: if options.emit_legacy_events: self._emit("s3.apply.error", bucket=bucket, error=str(e)) self._emit( "plan_failed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, error=str(e), ) self._emit( "manager.apply.done", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=bucket, outcome="failed", plan_summary=summary, error=str(e), ) raise
[docs] def execute( self, desired: S3BucketDesired, *, options: ExecuteOptions | None = None, ) -> S3ManagerResult: options = options or ExecuteOptions() plan_options = options.plan_options or self.plan_options current = self.get_current(desired) p = self.plan(desired, current, options=plan_options) summary = self._plan_summary(p) if p.is_noop: result = self.provider.get_outputs(desired.name) self._emit( "manager.execute.done", module="dxaws-s3", module_version=self._module_version(), plan_id=p.metadata.get("plan_id"), target=f"s3:bucket:{desired.name}", bucket=desired.name, outcome="noop", plan_summary=summary, ) return S3ManagerResult( desired=desired, current=current, plan=p, result=result, outcome="noop", ) try: result = self.apply_plan(p, options=options.apply_options) self._emit( "manager.execute.done", module="dxaws-s3", module_version=self._module_version(), plan_id=p.metadata.get("plan_id"), target=f"s3:bucket:{desired.name}", bucket=desired.name, outcome="applied", plan_summary=summary, ) return S3ManagerResult( desired=desired, current=current, plan=p, result=result, outcome="applied", ) except Exception as e: # apply_plan already emitted failure events self._emit( "manager.execute.done", module="dxaws-s3", module_version=self._module_version(), plan_id=p.metadata.get("plan_id"), target=f"s3:bucket:{desired.name}", bucket=desired.name, outcome="failed", plan_summary=summary, error=str(e), ) raise