Module slack_bolt.adapter.falcon.resource
Classes
class SlackAppResource (app: App)
-
from slack_bolt import App app = App()
import falcon api = application = falcon.API() api.add_route("/slack/events", SlackAppResource(app))
Expand source code
class SlackAppResource: """ from slack_bolt import App app = App() import falcon api = application = falcon.API() api.add_route("/slack/events", SlackAppResource(app)) """ def __init__(self, app: App): self.app = app def on_get(self, req: Request, resp: Response): if self.app.oauth_flow is not None: oauth_flow: OAuthFlow = self.app.oauth_flow if req.path == oauth_flow.install_path: bolt_resp = oauth_flow.handle_installation(self._to_bolt_request(req)) self._write_response(bolt_resp, resp) return elif req.path == oauth_flow.redirect_uri_path: bolt_resp = oauth_flow.handle_callback(self._to_bolt_request(req)) self._write_response(bolt_resp, resp) return resp.status = "404" # Falcon 4.x w/ mypy fails to correctly infer the str type here resp.body = "The page is not found..." # type: ignore[assignment] def on_post(self, req: Request, resp: Response): bolt_req = self._to_bolt_request(req) bolt_resp = self.app.dispatch(bolt_req) self._write_response(bolt_resp, resp) def _to_bolt_request(self, req: Request) -> BoltRequest: return BoltRequest( body=req.stream.read(req.content_length or 0).decode("utf-8"), query=req.query_string, headers={k.lower(): v for k, v in req.headers.items()}, ) def _write_response(self, bolt_resp: BoltResponse, resp: Response): if falcon_version.__version__.startswith("2."): # Falcon 4.x w/ mypy fails to correctly infer the str type here resp.body = bolt_resp.body # type: ignore[assignment] else: resp.text = bolt_resp.body status = HTTPStatus(bolt_resp.status) resp.status = str(f"{status.value} {status.phrase}") resp.set_headers(bolt_resp.first_headers_without_set_cookie()) for cookie in bolt_resp.cookies(): for name, c in cookie.items(): expire_value = c.get("expires") expire = datetime.strptime(expire_value, "%a, %d %b %Y %H:%M:%S %Z") if expire_value else None resp.set_cookie( name=name, value=c.value, expires=expire, max_age=c.get("max-age"), domain=c.get("domain"), path=c.get("path"), secure=True, http_only=True, )
Methods
def on_get(self, req: falcon.request.Request, resp: falcon.response.Response)
def on_post(self, req: falcon.request.Request, resp: falcon.response.Response)