Module slack_bolt.workflows.step.async_step_middleware
Classes
class AsyncWorkflowStepMiddleware (step: AsyncWorkflowStep, listener_runner: AsyncioListenerRunner)
-
Base middleware for step from app specific ones
Expand source code
class AsyncWorkflowStepMiddleware(AsyncMiddleware): """Base middleware for step from app specific ones""" def __init__(self, step: AsyncWorkflowStep, listener_runner: AsyncioListenerRunner): self.step = step self.listener_runner = listener_runner async def async_process( self, *, req: AsyncBoltRequest, resp: BoltResponse, next: Callable[[], Awaitable[BoltResponse]], ) -> BoltResponse: if await self.step.edit.async_matches(req=req, resp=resp): resp = await self._run(self.step.edit, req, resp) if resp is not None: return resp elif await self.step.save.async_matches(req=req, resp=resp): resp = await self._run(self.step.save, req, resp) if resp is not None: return resp elif await self.step.execute.async_matches(req=req, resp=resp): resp = await self._run(self.step.execute, req, resp) if resp is not None: return resp return await next() async def _run( self, listener: AsyncListener, req: AsyncBoltRequest, resp: BoltResponse, ) -> Optional[BoltResponse]: resp, next_was_not_called = await listener.run_async_middleware(req=req, resp=resp) if next_was_not_called: return None return await self.listener_runner.run( request=req, response=resp, listener_name=get_name_for_callable(listener.ack_function), listener=listener, )
Ancestors
Inherited members