"""The Manager - Public module facade for dxaws-s3."""
from __future__ import annotations
import uuid
from importlib import metadata
from typing import Any
from dataclasses import dataclass, field
import time
from dxaws_core.o11y import O11y
from dxaws_s3.executor import execute
from dxaws_s3.models import S3BucketDesired, S3BucketPlan, S3BucketResult, S3BucketCurrent
from dxaws_s3.planner import PlanOptions, plan
from dxaws_s3.providers.base import S3Provider
[docs]
@dataclass(frozen=True)
class ApplyOptions:
emit_legacy_events: bool = True
[docs]
@dataclass(frozen=True)
class ExecuteOptions:
plan_options: PlanOptions | None = None
apply_options: ApplyOptions | None = None
emit_legacy_events: bool = True
[docs]
@dataclass(frozen=True)
class S3ManagerResult:
desired: S3BucketDesired
current: S3BucketCurrent
plan: S3BucketPlan
result: S3BucketResult | None
outcome: str
[docs]
@dataclass(frozen=True)
class S3Manager:
provider: S3Provider
plan_options: PlanOptions = field(default_factory=PlanOptions)
o11y: O11y = field(default_factory=O11y.noop)
def _module_version(self) -> str:
try:
return metadata.version("dxaws-s3")
except Exception:
return "unknown"
def _emit(self, event: str, **fields: Any) -> None:
# Keep a single emission point so we can normalize field shape later.
self.o11y.info(event, **fields)
def _plan_summary(self, p: S3BucketPlan) -> dict[str, Any]:
return {
"noop": p.is_noop,
"operation_count": len(p.operations),
"operations": [op.op_type.value for op in p.operations],
"types": [op.type.value for op in p.operations],
}
[docs]
def get_current(self, desired: S3BucketDesired) -> S3BucketCurrent:
return self.provider.get_current(desired.name)
[docs]
def plan(
self,
desired: S3BucketDesired,
current: S3BucketCurrent | None = None,
*,
options: PlanOptions | None = None,
) -> S3BucketPlan:
plan_id = uuid.uuid4().hex
current = current or self.get_current(desired)
options = options or self.plan_options
target = f"s3:bucket:{desired.name}"
module_version = self._module_version()
start = time.monotonic()
self._emit("manager.plan.start", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, bucket=desired.name)
self._emit("s3.plan.start", bucket=desired.name)
self._emit("plan_started", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target)
p = plan(desired, current, options=options)
summary = self._plan_summary(p)
self._emit("s3.plan.done", bucket=desired.name, operations=summary["operations"])
self._emit("plan_completed", module="dxaws-s3", module_version=module_version, plan_id=plan_id, target=target, ops=summary["operations"], types=summary["types"])
self._emit(
"manager.plan.done",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
bucket=desired.name,
duration_ms=int((time.monotonic() - start) * 1000),
plan_summary=summary,
)
merged_metadata = dict(p.metadata) if p.metadata else {}
merged_metadata["plan_id"] = plan_id
return S3BucketPlan(
desired=p.desired,
current=p.current,
operations=p.operations,
metadata=merged_metadata,
)
[docs]
def apply(
self,
desired: S3BucketDesired,
*,
options: ApplyOptions | None = None,
) -> S3BucketResult:
p = self.plan(desired)
return self.apply_plan(p, options=options)
[docs]
def apply_plan(
self,
p: S3BucketPlan,
*,
options: ApplyOptions | None = None,
) -> S3BucketResult:
options = options or ApplyOptions()
bucket = p.desired.name
target = f"s3:bucket:{bucket}"
plan_id = p.metadata.get("plan_id", uuid.uuid4().hex) if p.metadata else uuid.uuid4().hex
module_version = self._module_version()
summary = self._plan_summary(p)
if options.emit_legacy_events:
self._emit("s3.apply.start", bucket=bucket)
self._emit(
"plan_started",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
)
if p.is_noop:
if options.emit_legacy_events:
self._emit("s3.apply.noop", bucket=bucket)
self._emit(
"plan_completed",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
ops=[],
types=[],
)
return self.provider.get_outputs(bucket)
try:
self._emit(
"manager.apply.start",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
bucket=bucket,
)
apply_start = time.monotonic()
def _on_start(idx, op):
self._emit(
"operation_started",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
op_index=idx,
op_type=op.op_type.value,
target=op.target,
reason=op.reason,
)
def _on_done(idx, op):
self._emit(
"operation_completed",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
op_index=idx,
op_type=op.op_type.value,
target=op.target,
reason=op.reason,
)
result = execute(
p,
self.provider,
on_op_start=_on_start,
on_op_done=_on_done,
)
if options.emit_legacy_events:
self._emit(
"s3.apply.done",
bucket=bucket,
operations=[op.op_type.value for op in p.operations],
)
self._emit(
"plan_completed",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
ops=[op.op_type.value for op in p.operations],
types=[op.type.value for op in p.operations],
)
self._emit(
"manager.apply.done",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
bucket=bucket,
outcome="applied",
duration_ms=int((time.monotonic() - apply_start) * 1000),
plan_summary=summary,
)
return result
except Exception as e:
if options.emit_legacy_events:
self._emit("s3.apply.error", bucket=bucket, error=str(e))
self._emit(
"plan_failed",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
error=str(e),
)
self._emit(
"manager.apply.done",
module="dxaws-s3",
module_version=module_version,
plan_id=plan_id,
target=target,
bucket=bucket,
outcome="failed",
plan_summary=summary,
error=str(e),
)
raise
[docs]
def execute(
self,
desired: S3BucketDesired,
*,
options: ExecuteOptions | None = None,
) -> S3ManagerResult:
options = options or ExecuteOptions()
plan_options = options.plan_options or self.plan_options
current = self.get_current(desired)
p = self.plan(desired, current, options=plan_options)
summary = self._plan_summary(p)
if p.is_noop:
result = self.provider.get_outputs(desired.name)
self._emit(
"manager.execute.done",
module="dxaws-s3",
module_version=self._module_version(),
plan_id=p.metadata.get("plan_id"),
target=f"s3:bucket:{desired.name}",
bucket=desired.name,
outcome="noop",
plan_summary=summary,
)
return S3ManagerResult(
desired=desired,
current=current,
plan=p,
result=result,
outcome="noop",
)
try:
result = self.apply_plan(p, options=options.apply_options)
self._emit(
"manager.execute.done",
module="dxaws-s3",
module_version=self._module_version(),
plan_id=p.metadata.get("plan_id"),
target=f"s3:bucket:{desired.name}",
bucket=desired.name,
outcome="applied",
plan_summary=summary,
)
return S3ManagerResult(
desired=desired,
current=current,
plan=p,
result=result,
outcome="applied",
)
except Exception as e:
# apply_plan already emitted failure events
self._emit(
"manager.execute.done",
module="dxaws-s3",
module_version=self._module_version(),
plan_id=p.metadata.get("plan_id"),
target=f"s3:bucket:{desired.name}",
bucket=desired.name,
outcome="failed",
plan_summary=summary,
error=str(e),
)
raise