Module slack_bolt.workflows.step.utilities.async_complete

Classes

class AsyncComplete (*, client: slack_sdk.web.async_client.AsyncWebClient, body: dict)

complete() utility to tell Slack the completion of a step from app execution.

async def execute(step, complete, fail):
    inputs = step["inputs"]
    # if everything was successful
    outputs = {
        "task_name": inputs["task_name"]["value"],
        "task_description": inputs["task_description"]["value"],
    }
    await complete(outputs=outputs)

ws = AsyncWorkflowStep(
    callback_id="add_task",
    edit=edit,
    save=save,
    execute=execute,
)
app.step(ws)

This utility is a thin wrapper of workflows.stepCompleted API method. Refer to https://api.slack.com/methods/workflows.stepCompleted for details.

Expand source code
class AsyncComplete:
    """`complete()` utility to tell Slack the completion of a step from app execution.

        async def execute(step, complete, fail):
            inputs = step["inputs"]
            # if everything was successful
            outputs = {
                "task_name": inputs["task_name"]["value"],
                "task_description": inputs["task_description"]["value"],
            }
            await complete(outputs=outputs)

        ws = AsyncWorkflowStep(
            callback_id="add_task",
            edit=edit,
            save=save,
            execute=execute,
        )
        app.step(ws)

    This utility is a thin wrapper of workflows.stepCompleted API method.
    Refer to https://api.slack.com/methods/workflows.stepCompleted for details.
    """

    def __init__(self, *, client: AsyncWebClient, body: dict):
        self.client = client
        self.body = body

    async def __call__(self, **kwargs) -> None:
        await self.client.workflows_stepCompleted(
            workflow_step_execute_id=self.body["event"]["workflow_step"]["workflow_step_execute_id"],
            **kwargs,
        )