Module slack_bolt.adapter.falcon

Sub-modules

slack_bolt.adapter.falcon.async_resource
slack_bolt.adapter.falcon.resource

Classes

class SlackAppResource (app: App)
Expand source code
class SlackAppResource:
    """
    from slack_bolt import App
    app = App()

    import falcon
    api = application = falcon.API()
    api.add_route("/slack/events", SlackAppResource(app))
    """

    def __init__(self, app: App):
        self.app = app

    def on_get(self, req: Request, resp: Response):
        if self.app.oauth_flow is not None:
            oauth_flow: OAuthFlow = self.app.oauth_flow
            if req.path == oauth_flow.install_path:
                bolt_resp = oauth_flow.handle_installation(self._to_bolt_request(req))
                self._write_response(bolt_resp, resp)
                return
            elif req.path == oauth_flow.redirect_uri_path:
                bolt_resp = oauth_flow.handle_callback(self._to_bolt_request(req))
                self._write_response(bolt_resp, resp)
                return

        resp.status = "404"
        # Falcon 4.x w/ mypy fails to correctly infer the str type here
        resp.body = "The page is not found..."  # type: ignore[assignment]

    def on_post(self, req: Request, resp: Response):
        bolt_req = self._to_bolt_request(req)
        bolt_resp = self.app.dispatch(bolt_req)
        self._write_response(bolt_resp, resp)

    def _to_bolt_request(self, req: Request) -> BoltRequest:
        return BoltRequest(
            body=req.stream.read(req.content_length or 0).decode("utf-8"),
            query=req.query_string,
            headers={k.lower(): v for k, v in req.headers.items()},
        )

    def _write_response(self, bolt_resp: BoltResponse, resp: Response):
        if falcon_version.__version__.startswith("2."):
            # Falcon 4.x w/ mypy fails to correctly infer the str type here
            resp.body = bolt_resp.body  # type: ignore[assignment]
        else:
            resp.text = bolt_resp.body

        status = HTTPStatus(bolt_resp.status)
        resp.status = str(f"{status.value} {status.phrase}")
        resp.set_headers(bolt_resp.first_headers_without_set_cookie())
        for cookie in bolt_resp.cookies():
            for name, c in cookie.items():
                expire_value = c.get("expires")
                expire = datetime.strptime(expire_value, "%a, %d %b %Y %H:%M:%S %Z") if expire_value else None
                resp.set_cookie(
                    name=name,
                    value=c.value,
                    expires=expire,
                    max_age=c.get("max-age"),
                    domain=c.get("domain"),
                    path=c.get("path"),
                    secure=True,
                    http_only=True,
                )

from slack_bolt import App app = App()

import falcon api = application = falcon.API() api.add_route("/slack/events", SlackAppResource(app))

Methods

def on_get(self, req: falcon.request.Request, resp: falcon.response.Response)
Expand source code
def on_get(self, req: Request, resp: Response):
    if self.app.oauth_flow is not None:
        oauth_flow: OAuthFlow = self.app.oauth_flow
        if req.path == oauth_flow.install_path:
            bolt_resp = oauth_flow.handle_installation(self._to_bolt_request(req))
            self._write_response(bolt_resp, resp)
            return
        elif req.path == oauth_flow.redirect_uri_path:
            bolt_resp = oauth_flow.handle_callback(self._to_bolt_request(req))
            self._write_response(bolt_resp, resp)
            return

    resp.status = "404"
    # Falcon 4.x w/ mypy fails to correctly infer the str type here
    resp.body = "The page is not found..."  # type: ignore[assignment]
def on_post(self, req: falcon.request.Request, resp: falcon.response.Response)
Expand source code
def on_post(self, req: Request, resp: Response):
    bolt_req = self._to_bolt_request(req)
    bolt_resp = self.app.dispatch(bolt_req)
    self._write_response(bolt_resp, resp)