Module slack_sdk.socket_mode.client
Classes
class BaseSocketModeClient
-
Expand source code
class BaseSocketModeClient: logger: Logger web_client: WebClient app_token: str wss_uri: str message_queue: Queue message_listeners: List[ Union[ WebSocketMessageListener, Callable[["BaseSocketModeClient", dict, Optional[str]], None], ] ] socket_mode_request_listeners: List[ Union[ SocketModeRequestListener, Callable[["BaseSocketModeClient", SocketModeRequest], None], ] ] message_processor: IntervalRunner message_workers: ThreadPoolExecutor closed: bool connect_operation_lock: Lock def issue_new_wss_url(self) -> str: try: response = self.web_client.apps_connections_open(app_token=self.app_token) return response["url"] except SlackApiError as e: if e.response["error"] == "ratelimited": # NOTE: ratelimited errors rarely occur with this endpoint delay = int(e.response.headers.get("Retry-After", "30")) # Tier1 self.logger.info(f"Rate limited. Retrying in {delay} seconds...") time.sleep(delay) # Retry to issue a new WSS URL return self.issue_new_wss_url() else: # other errors self.logger.error(f"Failed to retrieve WSS URL: {e}") raise e def is_connected(self) -> bool: return False def connect(self) -> None: raise NotImplementedError() def disconnect(self) -> None: raise NotImplementedError() def connect_to_new_endpoint(self, force: bool = False): try: self.connect_operation_lock.acquire(blocking=True, timeout=5) if force or not self.is_connected(): self.logger.info("Connecting to a new endpoint...") self.wss_uri = self.issue_new_wss_url() self.connect() self.logger.info("Connected to a new endpoint...") finally: self.connect_operation_lock.release() def close(self) -> None: self.closed = True self.disconnect() def send_message(self, message: str) -> None: raise NotImplementedError() def send_socket_mode_response(self, response: Union[Dict[str, Any], SocketModeResponse]) -> None: if isinstance(response, SocketModeResponse): self.send_message(json.dumps(response.to_dict())) else: self.send_message(json.dumps(response)) def enqueue_message(self, message: str): self.message_queue.put(message) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new message enqueued (current queue size: {self.message_queue.qsize()})") def process_message(self): try: raw_message = self.message_queue.get(timeout=1) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A message dequeued (current queue size: {self.message_queue.qsize()})") if raw_message is not None: message: dict = {} if raw_message.startswith("{"): message = json.loads(raw_message) if message.get("type") == "disconnect": self.connect_to_new_endpoint(force=True) else: def _run_message_listeners(): self.run_message_listeners(message, raw_message) self.message_workers.submit(_run_message_listeners) except Empty: pass def run_message_listeners(self, message: dict, raw_message: str) -> None: type, envelope_id = message.get("type"), message.get("envelope_id") if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing started (type: {type}, envelope_id: {envelope_id})") try: # just in case, adding the same logic to reconnect here if message.get("type") == "disconnect": self.connect_to_new_endpoint(force=True) return for listener in self.message_listeners: try: listener(self, message, raw_message) # type: ignore except Exception as e: self.logger.exception(f"Failed to run a message listener: {e}") if len(self.socket_mode_request_listeners) > 0: request = SocketModeRequest.from_dict(message) if request is not None: for listener in self.socket_mode_request_listeners: try: listener(self, request) # type: ignore except Exception as e: self.logger.exception(f"Failed to run a request listener: {e}") except Exception as e: self.logger.exception(f"Failed to run message listeners: {e}") finally: if self.logger.level <= logging.DEBUG: self.logger.debug(f"Message processing completed (type: {type}, envelope_id: {envelope_id})") def process_messages(self) -> None: while not self.closed: try: self.process_message() except Exception as e: self.logger.exception(f"Failed to process a message: {e}")
Subclasses
Class variables
var app_token : str
-
The type of the None singleton.
var closed : bool
-
The type of the None singleton.
var connect_operation_lock : _thread.lock
-
The type of the None singleton.
var logger : logging.Logger
-
The type of the None singleton.
var message_listeners : List[WebSocketMessageListener | Callable[[BaseSocketModeClient, dict, str | None], None]]
-
The type of the None singleton.
var message_processor : IntervalRunner
-
The type of the None singleton.
var message_queue : queue.Queue
-
The type of the None singleton.
var message_workers : concurrent.futures.thread.ThreadPoolExecutor
-
The type of the None singleton.
var socket_mode_request_listeners : List[SocketModeRequestListener | Callable[[BaseSocketModeClient, SocketModeRequest], None]]
-
The type of the None singleton.
var web_client : WebClient
-
The type of the None singleton.
var wss_uri : str
-
The type of the None singleton.
Methods
def close(self) ‑> None
def connect(self) ‑> None
def connect_to_new_endpoint(self, force: bool = False)
def disconnect(self) ‑> None
def enqueue_message(self, message: str)
def is_connected(self) ‑> bool
def issue_new_wss_url(self) ‑> str
def process_message(self)
def process_messages(self) ‑> None
def run_message_listeners(self, message: dict, raw_message: str) ‑> None
def send_message(self, message: str) ‑> None
def send_socket_mode_response(self,
response: Dict[str, Any] | SocketModeResponse) ‑> None