Source code for dxaws_s3.executor

"""Executor for dxaws-s3."""

from __future__ import annotations

from .models import ActionType, S3BucketPlan, S3BucketResult
from .providers.base import S3Provider


[docs] def execute( plan: S3BucketPlan, provider: S3Provider, *, on_op_start=None, on_op_done=None, ) -> S3BucketResult: desired = plan.desired created = False for idx, a in enumerate(plan.operations): if on_op_start: on_op_start(idx, a) if a.type == ActionType.CREATE_BUCKET: # Enforce the expectation that a plan only creates a bucket once. if created: raise ValueError( f"Plan contains multiple CREATE_BUCKET operations for {desired.name}" ) provider.create_bucket(desired) created = True elif a.type == ActionType.PUT_PUBLIC_ACCESS_BLOCK: provider.put_public_access_block( desired.name, bool(a.payload.get("enabled", True)), ) elif a.type == ActionType.PUT_ENCRYPTION: provider.put_encryption( desired.name, str(a.payload.get("encryption")), a.payload.get("kms_key_arn"), ) elif a.type == ActionType.PUT_VERSIONING: provider.put_versioning( desired.name, bool(a.payload.get("enabled", False)), ) elif a.type == ActionType.PUT_BUCKET_POLICY: provider.put_bucket_policy( desired.name, str(a.payload.get("policy_json")), ) elif a.type == ActionType.PUT_TAGS: provider.put_tags( desired.name, dict(a.payload.get("tags") or {}), ) else: raise ValueError(f"Unknown action type: {a.type!r}") if on_op_done: on_op_done(idx, a) return provider.get_outputs(desired.name)