Module slack_bolt.app.async_server
Classes
class AsyncSlackAppServer (port: int, path: str, app: AsyncApp, host: Optional[str] = None)
-
Standalone AIOHTTP Web Server. Refer to https://docs.aiohttp.org/en/stable/web.html for details of AIOHTTP.
Args
port
- The port to listen on
path
- The path to receive incoming requests from Slack
app
- The
AsyncApp
instance that is used for processing requests host
- The hostname to serve the web endpoints. (Default: 0.0.0.0)
Expand source code
class AsyncSlackAppServer: port: int path: str host: str bolt_app: "AsyncApp" # type: ignore[name-defined] web_app: web.Application def __init__( self, port: int, path: str, app: "AsyncApp", # type: ignore[name-defined] host: Optional[str] = None, ): """Standalone AIOHTTP Web Server. Refer to https://docs.aiohttp.org/en/stable/web.html for details of AIOHTTP. Args: port: The port to listen on path: The path to receive incoming requests from Slack app: The `AsyncApp` instance that is used for processing requests host: The hostname to serve the web endpoints. (Default: 0.0.0.0) """ self.port = port self.path = path self.host = host if host is not None else "0.0.0.0" self.bolt_app: "AsyncApp" = app # type: ignore[name-defined] self.web_app = web.Application() self._bolt_oauth_flow = self.bolt_app.oauth_flow if self._bolt_oauth_flow: self.web_app.add_routes( [ web.get(self._bolt_oauth_flow.install_path, self.handle_get_requests), web.get( self._bolt_oauth_flow.redirect_uri_path, self.handle_get_requests, ), web.post(self.path, self.handle_post_requests), ] ) else: self.web_app.add_routes([web.post(self.path, self.handle_post_requests)]) async def handle_get_requests(self, request: web.Request) -> web.Response: oauth_flow = self._bolt_oauth_flow if oauth_flow: if request.path == oauth_flow.install_path: bolt_req = await to_bolt_request(request) bolt_resp = await oauth_flow.handle_installation(bolt_req) return await to_aiohttp_response(bolt_resp) elif request.path == oauth_flow.redirect_uri_path: bolt_req = await to_bolt_request(request) bolt_resp = await oauth_flow.handle_callback(bolt_req) return await to_aiohttp_response(bolt_resp) else: return web.Response(status=404) else: return web.Response(status=404) async def handle_post_requests(self, request: web.Request) -> web.Response: if self.path != request.path: return web.Response(status=404) bolt_req = await to_bolt_request(request) bolt_resp: BoltResponse = await self.bolt_app.async_dispatch(bolt_req) return await to_aiohttp_response(bolt_resp) def start(self, host: Optional[str] = None) -> None: """Starts a new web server process.""" if self.bolt_app.logger.level > logging.INFO: print(get_boot_message()) else: self.bolt_app.logger.info(get_boot_message()) _host = host if host is not None else self.host web.run_app(self.web_app, host=_host, port=self.port)
Class variables
var bolt_app : AsyncApp
var host : str
var path : str
var port : int
var web_app : aiohttp.web_app.Application
Methods
async def handle_get_requests(self, request: aiohttp.web_request.Request) ‑> aiohttp.web_response.Response
async def handle_post_requests(self, request: aiohttp.web_request.Request) ‑> aiohttp.web_response.Response
def start(self, host: Optional[str] = None) ‑> None
-
Starts a new web server process.