Source code for dxaws_s3.executor
"""Executor for dxaws-s3."""
from __future__ import annotations
from .models import ActionType, S3BucketPlan, S3BucketResult
from .providers.base import S3Provider
[docs]
def execute(
plan: S3BucketPlan,
provider: S3Provider,
*,
on_op_start=None,
on_op_done=None,
) -> S3BucketResult:
desired = plan.desired
created = False
for idx, a in enumerate(plan.operations):
if on_op_start:
on_op_start(idx, a)
if a.type == ActionType.CREATE_BUCKET:
# Enforce the expectation that a plan only creates a bucket once.
if created:
raise ValueError(
f"Plan contains multiple CREATE_BUCKET operations for {desired.name}"
)
provider.create_bucket(desired)
created = True
elif a.type == ActionType.PUT_PUBLIC_ACCESS_BLOCK:
provider.put_public_access_block(
desired.name,
bool(a.payload.get("enabled", True)),
)
elif a.type == ActionType.PUT_ENCRYPTION:
provider.put_encryption(
desired.name,
str(a.payload.get("encryption")),
a.payload.get("kms_key_arn"),
)
elif a.type == ActionType.PUT_VERSIONING:
provider.put_versioning(
desired.name,
bool(a.payload.get("enabled", False)),
)
elif a.type == ActionType.PUT_BUCKET_POLICY:
provider.put_bucket_policy(
desired.name,
str(a.payload.get("policy_json")),
)
elif a.type == ActionType.PUT_TAGS:
provider.put_tags(
desired.name,
dict(a.payload.get("tags") or {}),
)
else:
raise ValueError(f"Unknown action type: {a.type!r}")
if on_op_done:
on_op_done(idx, a)
return provider.get_outputs(desired.name)