Module slack_bolt.adapter.asgi.base_handler

Classes

class BaseSlackRequestHandler
Expand source code
class BaseSlackRequestHandler:
    app: Union[App, "AsyncApp"]  # type: ignore[name-defined]
    path: str

    async def dispatch(self, request: AsgiHttpRequest) -> BoltResponse:
        """Dispatches a request to the Bolt App"""
        raise NotImplementedError

    async def handle_installation(self, request: AsgiHttpRequest) -> BoltResponse:
        """Handles installation of the OAuthFlow"""
        raise NotImplementedError

    async def handle_callback(self, request: AsgiHttpRequest) -> BoltResponse:
        """Handles the callback of the OAuthFlow"""
        raise NotImplementedError

    async def _get_http_response(self, method: str, path: str, request: AsgiHttpRequest) -> AsgiHttpResponse:
        if method == "GET":
            if self.app.oauth_flow is not None:
                if path == self.app.oauth_flow.install_path:
                    bolt_response: BoltResponse = await self.handle_installation(request)
                    return AsgiHttpResponse(
                        status=bolt_response.status, headers=bolt_response.headers, body=bolt_response.body
                    )
                elif path == self.app.oauth_flow.redirect_uri_path:
                    bolt_response = await self.handle_callback(request)
                    return AsgiHttpResponse(
                        status=bolt_response.status, headers=bolt_response.headers, body=bolt_response.body
                    )
        if method == "POST" and path == self.path:
            bolt_response = await self.dispatch(request)
            return AsgiHttpResponse(status=bolt_response.status, headers=bolt_response.headers, body=bolt_response.body)
        return AsgiHttpResponse(status=404, headers={"content-type": ["text/plain;charset=utf-8"]}, body="Not Found")

    async def _handle_lifespan(self, receive: Callable) -> Dict[str, str]:
        while True:
            lifespan = await receive()
            if lifespan["type"] == "lifespan.startup":
                """Do something before startup"""
                return {"type": "lifespan.startup.complete"}
            if lifespan["type"] == "lifespan.shutdown":
                """Do something before shutdown"""
                return {"type": "lifespan.shutdown.complete"}

    async def __call__(self, scope: scope_type, receive: Callable, send: Callable) -> None:
        if scope["type"] == "http":
            response: AsgiHttpResponse = await self._get_http_response(
                method=scope["method"], path=scope["path"], request=AsgiHttpRequest(scope, receive)  # type: ignore[arg-type]
            )
            await send(response.get_response_start())
            await send(response.get_response_body())
            return
        if scope["type"] == "lifespan":
            await send(await self._handle_lifespan(receive))
            return
        raise TypeError(f"Unsupported scope type: {scope['type']!r}")

Subclasses

Class variables

var app : Union[App, AsyncApp]
var path : str

Methods

async def dispatch(self, request: AsgiHttpRequest) ‑> BoltResponse

Dispatches a request to the Bolt App

async def handle_callback(self, request: AsgiHttpRequest) ‑> BoltResponse

Handles the callback of the OAuthFlow

async def handle_installation(self, request: AsgiHttpRequest) ‑> BoltResponse

Handles installation of the OAuthFlow