Module slack_bolt.adapter.asgi.http_request
Classes
class AsgiHttpRequest (scope: Dict[str, str | bytes | Iterable[Tuple[bytes, bytes]]],
receive: Callable)-
Expand source code
class AsgiHttpRequest: __slots__ = ("receive", "query_string", "raw_headers") def __init__(self, scope: scope_type, receive: Callable): self.receive = receive self.query_string = str(scope["query_string"], ENCODING) # type: ignore[arg-type] self.raw_headers: Iterable[Tuple[bytes, bytes]] = scope["headers"] # type: ignore[assignment] def get_headers(self) -> Dict[str, Union[str, Sequence[str]]]: return {str(header[0], ENCODING): str(header[1], (ENCODING)) for header in self.raw_headers} async def get_raw_body(self) -> str: chunks = bytearray() while True: chunk: Dict[str, Union[str, bytes]] = await self.receive() if chunk["type"] != "http.request": raise Exception("Body chunks could not be received from asgi server") chunks.extend(chunk.get("body", b"")) # type: ignore[arg-type] if not chunk.get("more_body", False): break return bytes(chunks).decode(ENCODING)
Instance variables
var query_string
-
Expand source code
class AsgiHttpRequest: __slots__ = ("receive", "query_string", "raw_headers") def __init__(self, scope: scope_type, receive: Callable): self.receive = receive self.query_string = str(scope["query_string"], ENCODING) # type: ignore[arg-type] self.raw_headers: Iterable[Tuple[bytes, bytes]] = scope["headers"] # type: ignore[assignment] def get_headers(self) -> Dict[str, Union[str, Sequence[str]]]: return {str(header[0], ENCODING): str(header[1], (ENCODING)) for header in self.raw_headers} async def get_raw_body(self) -> str: chunks = bytearray() while True: chunk: Dict[str, Union[str, bytes]] = await self.receive() if chunk["type"] != "http.request": raise Exception("Body chunks could not be received from asgi server") chunks.extend(chunk.get("body", b"")) # type: ignore[arg-type] if not chunk.get("more_body", False): break return bytes(chunks).decode(ENCODING)
var raw_headers
-
Expand source code
class AsgiHttpRequest: __slots__ = ("receive", "query_string", "raw_headers") def __init__(self, scope: scope_type, receive: Callable): self.receive = receive self.query_string = str(scope["query_string"], ENCODING) # type: ignore[arg-type] self.raw_headers: Iterable[Tuple[bytes, bytes]] = scope["headers"] # type: ignore[assignment] def get_headers(self) -> Dict[str, Union[str, Sequence[str]]]: return {str(header[0], ENCODING): str(header[1], (ENCODING)) for header in self.raw_headers} async def get_raw_body(self) -> str: chunks = bytearray() while True: chunk: Dict[str, Union[str, bytes]] = await self.receive() if chunk["type"] != "http.request": raise Exception("Body chunks could not be received from asgi server") chunks.extend(chunk.get("body", b"")) # type: ignore[arg-type] if not chunk.get("more_body", False): break return bytes(chunks).decode(ENCODING)
var receive
-
Expand source code
class AsgiHttpRequest: __slots__ = ("receive", "query_string", "raw_headers") def __init__(self, scope: scope_type, receive: Callable): self.receive = receive self.query_string = str(scope["query_string"], ENCODING) # type: ignore[arg-type] self.raw_headers: Iterable[Tuple[bytes, bytes]] = scope["headers"] # type: ignore[assignment] def get_headers(self) -> Dict[str, Union[str, Sequence[str]]]: return {str(header[0], ENCODING): str(header[1], (ENCODING)) for header in self.raw_headers} async def get_raw_body(self) -> str: chunks = bytearray() while True: chunk: Dict[str, Union[str, bytes]] = await self.receive() if chunk["type"] != "http.request": raise Exception("Body chunks could not be received from asgi server") chunks.extend(chunk.get("body", b"")) # type: ignore[arg-type] if not chunk.get("more_body", False): break return bytes(chunks).decode(ENCODING)
Methods
def get_headers(self) ‑> Dict[str, str | Sequence[str]]
-
Expand source code
def get_headers(self) -> Dict[str, Union[str, Sequence[str]]]: return {str(header[0], ENCODING): str(header[1], (ENCODING)) for header in self.raw_headers}
async def get_raw_body(self) ‑> str
-
Expand source code
async def get_raw_body(self) -> str: chunks = bytearray() while True: chunk: Dict[str, Union[str, bytes]] = await self.receive() if chunk["type"] != "http.request": raise Exception("Body chunks could not be received from asgi server") chunks.extend(chunk.get("body", b"")) # type: ignore[arg-type] if not chunk.get("more_body", False): break return bytes(chunks).decode(ENCODING)