Module slack_bolt.request.internals
Functions
def build_context(context: BoltContext,
body: Dict[str, Any]) ‑> BoltContext-
Expand source code
def build_context(context: BoltContext, body: Dict[str, Any]) -> BoltContext: context["is_enterprise_install"] = extract_is_enterprise_install(body) enterprise_id = extract_enterprise_id(body) if enterprise_id: context["enterprise_id"] = enterprise_id team_id = extract_team_id(body) if team_id: context["team_id"] = team_id user_id = extract_user_id(body) if user_id: context["user_id"] = user_id # Actor IDs are useful for Events API on a Slack Connect channel actor_enterprise_id = extract_actor_enterprise_id(body) if actor_enterprise_id: context["actor_enterprise_id"] = actor_enterprise_id actor_team_id = extract_actor_team_id(body) if actor_team_id: context["actor_team_id"] = actor_team_id actor_user_id = extract_actor_user_id(body) if actor_user_id: context["actor_user_id"] = actor_user_id channel_id = extract_channel_id(body) if channel_id: context["channel_id"] = channel_id thread_ts = extract_thread_ts(body) if thread_ts: context["thread_ts"] = thread_ts function_execution_id = extract_function_execution_id(body) if function_execution_id is not None: context["function_execution_id"] = function_execution_id function_bot_access_token = extract_function_bot_access_token(body) if function_bot_access_token is not None: context["function_bot_access_token"] = function_bot_access_token inputs = extract_function_inputs(body) if inputs is not None: context["inputs"] = inputs if "response_url" in body: context["response_url"] = body["response_url"] elif "response_urls" in body: # In the case where response_url_enabled: true in a modal exists response_urls = body["response_urls"] if len(response_urls) >= 1: if len(response_urls) > 1: context.logger.debug(debug_multiple_response_urls_detected()) response_url = response_urls[0].get("response_url") context["response_url"] = response_url return context
def build_normalized_headers(headers: Dict[str, str | Sequence[str]] | None) ‑> Dict[str, Sequence[str]]
-
Expand source code
def build_normalized_headers(headers: Optional[Dict[str, Union[str, Sequence[str]]]]) -> Dict[str, Sequence[str]]: normalized_headers: Dict[str, Sequence[str]] = {} if headers is not None: for key, value in headers.items(): normalized_name = key.lower() if isinstance(value, list): normalized_headers[normalized_name] = value elif isinstance(value, str): normalized_headers[normalized_name] = [value] else: raise ValueError(f"Unsupported type ({type(value)}) of element in headers ({headers})") return normalized_headers
def debug_multiple_response_urls_detected() ‑> str
-
Expand source code
def debug_multiple_response_urls_detected() -> str: return ( "`response_urls` in the body has multiple URLs in it. " "If you would like to use non-primary one, " "please manually extract the one from body['response_urls']." )
def error_message_raw_body_required_in_http_mode() ‑> str
-
Expand source code
def error_message_raw_body_required_in_http_mode() -> str: return "`body` must be a raw string data when running in the HTTP server mode"
def extract_actor_enterprise_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_actor_enterprise_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("is_ext_shared_channel") is True: if payload.get("type") == "event_callback": # For safety, we don't set actor IDs for the events like "file_shared", # which do not provide any team ID in $.event data. In the case, the IDs cannot be correct. event_team_id = payload.get("event", {}).get("user_team") or payload.get("event", {}).get("team") if event_team_id is not None and str(event_team_id).startswith("E"): return event_team_id if event_team_id == payload.get("team_id"): return payload.get("enterprise_id") return None return extract_enterprise_id(payload)
def extract_actor_team_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_actor_team_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("is_ext_shared_channel") is True: if payload.get("type") == "event_callback": event_type = payload.get("event", {}).get("type") if event_type == "app_mention": # The $.event.user_team can be an enterprise_id in app_mention events. # In the scenario, there is no way to retrieve actor_team_id as of March 2023 user_team = payload.get("event", {}).get("user_team") if user_team is None: # working with an app installed in this user's org/workspace side return payload.get("event", {}).get("team") if str(user_team).startswith("T"): # interacting from a connected non-grid workspace return user_team # Interacting from a connected grid workspace; in this case, team_id cannot be resolved as of March 2023 return None # For safety, we don't set actor IDs for the events like "file_shared", # which do not provide any team ID in $.event data. In the case, the IDs cannot be correct. event_user_team = payload.get("event", {}).get("user_team") if event_user_team is not None: if str(event_user_team).startswith("T"): return event_user_team elif str(event_user_team).startswith("E"): if event_user_team == payload.get("enterprise_id"): return payload.get("team_id") elif event_user_team == payload.get("context_enterprise_id"): return payload.get("context_team_id") event_team = payload.get("event", {}).get("team") if event_team is not None: if str(event_team).startswith("T"): return event_team elif str(event_team).startswith("E"): if event_team == payload.get("enterprise_id"): return payload.get("team_id") elif event_team == payload.get("context_enterprise_id"): return payload.get("context_team_id") return None return extract_team_id(payload)
def extract_actor_user_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_actor_user_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("is_ext_shared_channel") is True: if payload.get("type") == "event_callback": event = payload.get("event") if event is None: return None if extract_actor_enterprise_id(payload) is None and extract_actor_team_id(payload) is None: # When both enterprise_id and team_id are not identified, we skip returning user_id too for safety return None return event.get("user") or event.get("user_id") return extract_user_id(payload)
def extract_channel_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_channel_id(payload: Dict[str, Any]) -> Optional[str]: channel = payload.get("channel") if channel is not None: if isinstance(channel, str): return channel elif "id" in channel: return channel.get("id") if "channel_id" in payload: return payload.get("channel_id") if payload.get("event") is not None: return extract_channel_id(payload["event"]) if payload.get("item") is not None: # reaction_added: body["event"]["item"] return extract_channel_id(payload["item"]) if payload.get("assistant_thread") is not None: # assistant_thread_started return extract_channel_id(payload["assistant_thread"]) return None
def extract_content_type(headers: Dict[str, Sequence[str]]) ‑> str | None
-
Expand source code
def extract_content_type(headers: Dict[str, Sequence[str]]) -> Optional[str]: content_type: Optional[str] = headers.get("content-type", [None])[0] if content_type: return content_type.split(";")[0] return None
def extract_enterprise_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_enterprise_id(payload: Dict[str, Any]) -> Optional[str]: org = payload.get("enterprise") if org is not None: if isinstance(org, str): return org elif "id" in org: return org.get("id") if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0: # To make Events API handling functioning also for shared channels, # we should use .authorizations[0].enterprise_id over .enterprise_id return extract_enterprise_id(payload["authorizations"][0]) if "enterprise_id" in payload: return payload.get("enterprise_id") if payload.get("team") is not None and "enterprise_id" in payload["team"]: # In the case where the type is view_submission return payload["team"].get("enterprise_id") if payload.get("event") is not None: return extract_enterprise_id(payload["event"]) return None
def extract_function_bot_access_token(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_function_bot_access_token(payload: Dict[str, Any]) -> Optional[str]: if payload.get("bot_access_token") is not None: return payload.get("bot_access_token") if payload.get("event") is not None: return payload["event"].get("bot_access_token") return None
def extract_function_execution_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_function_execution_id(payload: Dict[str, Any]) -> Optional[str]: if payload.get("function_execution_id") is not None: return payload.get("function_execution_id") if payload.get("event") is not None: return extract_function_execution_id(payload["event"]) if payload.get("function_data") is not None: return payload["function_data"].get("execution_id") return None
def extract_function_inputs(payload: Dict[str, Any]) ‑> Dict[str, Any] | None
-
Expand source code
def extract_function_inputs(payload: Dict[str, Any]) -> Optional[Dict[str, Any]]: if payload.get("event") is not None: return payload["event"].get("inputs") if payload.get("function_data") is not None: return payload["function_data"].get("inputs") return None
def extract_is_enterprise_install(payload: Dict[str, Any]) ‑> bool | None
-
Expand source code
def extract_is_enterprise_install(payload: Dict[str, Any]) -> Optional[bool]: if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0: # To make Events API handling functioning also for shared channels, # we should use .authorizations[0].is_enterprise_install over .is_enterprise_install return extract_is_enterprise_install(payload["authorizations"][0]) if "is_enterprise_install" in payload: is_enterprise_install = payload.get("is_enterprise_install") return is_enterprise_install is not None and (is_enterprise_install is True or is_enterprise_install == "true") return False
def extract_team_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_team_id(payload: Dict[str, Any]) -> Optional[str]: app_installed_team_id = payload.get("view", {}).get("app_installed_team_id") if app_installed_team_id is not None: # view_submission payloads can have `view.app_installed_team_id` when a modal view that was opened # in a different workspace via some operations inside a Slack Connect channel. # Note that the same for enterprise_id does not exist. When you need to know the enterprise_id as well, # you have to run some query toward your InstallationStore to know the org where the team_id belongs to. return app_installed_team_id if payload.get("team") is not None: # With org-wide installations, payload.team in interactivity payloads can be None # You need to extract either payload.user.team_id or payload.view.team_id as below team = payload.get("team") if isinstance(team, str): return team elif team and "id" in team: return team.get("id") if payload.get("authorizations") is not None and len(payload["authorizations"]) > 0: # To make Events API handling functioning also for shared channels, # we should use .authorizations[0].team_id over .team_id return extract_team_id(payload["authorizations"][0]) if "team_id" in payload: return payload.get("team_id") if payload.get("event") is not None: return extract_team_id(payload["event"]) if payload.get("user") is not None: return payload["user"]["team_id"] if payload.get("view") is not None: return payload.get("view", {})["team_id"] return None
def extract_thread_ts(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_thread_ts(payload: Dict[str, Any]) -> Optional[str]: # This utility initially supports only the use cases for AI assistants, but it may be fine to add more patterns. # That said, note that thread_ts is always required for assistant threads, but it's not for channels. # Thus, blindly setting this thread_ts to say utility can break existing apps' behaviors. if is_assistant_event(payload): event = payload["event"] if ( event.get("assistant_thread") is not None and event["assistant_thread"].get("channel_id") is not None and event["assistant_thread"].get("thread_ts") is not None ): # assistant_thread_started, assistant_thread_context_changed # "assistant_thread" property can exist for message event without channel_id and thread_ts # Thus, the above if check verifies these properties exist return event["assistant_thread"]["thread_ts"] elif event.get("channel") is not None: if event.get("thread_ts") is not None: # message in an assistant thread return event["thread_ts"] elif event.get("message", {}).get("thread_ts") is not None: # message_changed return event["message"]["thread_ts"] elif event.get("previous_message", {}).get("thread_ts") is not None: # message_deleted return event["previous_message"]["thread_ts"] return None
def extract_user_id(payload: Dict[str, Any]) ‑> str | None
-
Expand source code
def extract_user_id(payload: Dict[str, Any]) -> Optional[str]: user = payload.get("user") if user is not None: if isinstance(user, str): return user elif "id" in user: return user.get("id") if "user_id" in payload: return payload.get("user_id") if payload.get("event") is not None: return extract_user_id(payload["event"]) if payload.get("message") is not None: # message_changed: body["event"]["message"] return extract_user_id(payload["message"]) if payload.get("previous_message") is not None: # message_deleted: body["event"]["previous_message"] return extract_user_id(payload["previous_message"]) return None
def parse_body(body: str, content_type: str | None) ‑> Dict[str, Any]
-
Expand source code
def parse_body(body: str, content_type: Optional[str]) -> Dict[str, Any]: if not body: return {} if (content_type is not None and content_type == "application/json") or body.startswith("{"): return json.loads(body) else: if "payload" in body: # This is not JSON format yet params = dict(parse_qsl(body, keep_blank_values=True)) payload = params.get("payload") if payload is not None: return json.loads(payload) else: return {} else: return dict(parse_qsl(body, keep_blank_values=True))
def parse_query(query: str | Dict[str, str] | Dict[str, Sequence[str]] | None) ‑> Dict[str, Sequence[str]]
-
Expand source code
def parse_query(query: Optional[Union[str, Dict[str, str], Dict[str, Sequence[str]]]]) -> Dict[str, Sequence[str]]: if query is None: return {} elif isinstance(query, str): return dict(parse_qs(query, keep_blank_values=True)) elif isinstance(query, dict) or hasattr(query, "items"): result: Dict[str, Sequence[str]] = {} for name, value in query.items(): if isinstance(value, list): result[name] = value elif isinstance(value, str): result[name] = [value] else: raise ValueError(f"Unsupported type ({type(value)}) of element in headers ({query})") return result else: raise ValueError(f"Unsupported type of query detected ({type(query)})")