Module slack_bolt.workflows.step.async_step_middleware

Classes

class AsyncWorkflowStepMiddleware (step: AsyncWorkflowStep)

Base middleware for step from app specific ones

Expand source code
class AsyncWorkflowStepMiddleware(AsyncMiddleware):
    """Base middleware for step from app specific ones"""

    def __init__(self, step: AsyncWorkflowStep):
        self.step = step

    async def async_process(
        self,
        *,
        req: AsyncBoltRequest,
        resp: BoltResponse,
        next: Callable[[], Awaitable[BoltResponse]],
    ) -> BoltResponse:

        if await self.step.edit.async_matches(req=req, resp=resp):
            resp = await self._run(self.step.edit, req, resp)
            if resp is not None:
                return resp
        elif await self.step.save.async_matches(req=req, resp=resp):
            resp = await self._run(self.step.save, req, resp)
            if resp is not None:
                return resp
        elif await self.step.execute.async_matches(req=req, resp=resp):
            resp = await self._run(self.step.execute, req, resp)
            if resp is not None:
                return resp

        return await next()

    @staticmethod
    async def _run(
        listener: AsyncListener,
        req: AsyncBoltRequest,
        resp: BoltResponse,
    ) -> Optional[BoltResponse]:
        resp, next_was_not_called = await listener.run_async_middleware(req=req, resp=resp)
        if next_was_not_called:
            return None

        return await req.context.listener_runner.run(
            request=req,
            response=resp,
            listener_name=get_name_for_callable(listener.ack_function),
            listener=listener,
        )

Ancestors

Inherited members