Module slack_sdk.socket_mode.client

Classes

class BaseSocketModeClient
Expand source code
class BaseSocketModeClient:
    logger: Logger
    web_client: WebClient
    app_token: str
    wss_uri: str
    message_queue: Queue
    message_listeners: List[
        Union[
            WebSocketMessageListener,
            Callable[["BaseSocketModeClient", dict, Optional[str]], None],
        ]
    ]
    socket_mode_request_listeners: List[
        Union[
            SocketModeRequestListener,
            Callable[["BaseSocketModeClient", SocketModeRequest], None],
        ]
    ]

    message_processor: IntervalRunner
    message_workers: ThreadPoolExecutor

    closed: bool
    connect_operation_lock: Lock

    def issue_new_wss_url(self) -> str:
        try:
            response = self.web_client.apps_connections_open(app_token=self.app_token)
            return response["url"]
        except SlackApiError as e:
            if e.response["error"] == "ratelimited":
                # NOTE: ratelimited errors rarely occur with this endpoint
                delay = int(e.response.headers.get("Retry-After", "30"))  # Tier1
                self.logger.info(f"Rate limited. Retrying in {delay} seconds...")
                time.sleep(delay)
                # Retry to issue a new WSS URL
                return self.issue_new_wss_url()
            else:
                # other errors
                self.logger.error(f"Failed to retrieve WSS URL: {e}")
                raise e

    def is_connected(self) -> bool:
        return False

    def connect(self) -> None:
        raise NotImplementedError()

    def disconnect(self) -> None:
        raise NotImplementedError()

    def connect_to_new_endpoint(self, force: bool = False):
        try:
            self.connect_operation_lock.acquire(blocking=True, timeout=5)
            if force or not self.is_connected():
                self.logger.info("Connecting to a new endpoint...")
                self.wss_uri = self.issue_new_wss_url()
                self.connect()
                self.logger.info("Connected to a new endpoint...")
        finally:
            self.connect_operation_lock.release()

    def close(self) -> None:
        self.closed = True
        self.disconnect()

    def send_message(self, message: str) -> None:
        raise NotImplementedError()

    def send_socket_mode_response(self, response: Union[Dict[str, Any], SocketModeResponse]) -> None:
        if isinstance(response, SocketModeResponse):
            self.send_message(json.dumps(response.to_dict()))
        else:
            self.send_message(json.dumps(response))

    def enqueue_message(self, message: str):
        self.message_queue.put(message)
        if self.logger.level <= logging.DEBUG:
            self.logger.debug(f"A new message enqueued (current queue size: {self.message_queue.qsize()})")

    def process_message(self):
        try:
            raw_message = self.message_queue.get(timeout=1)
            if self.logger.level <= logging.DEBUG:
                self.logger.debug(f"A message dequeued (current queue size: {self.message_queue.qsize()})")

            if raw_message is not None:
                message: dict = {}
                if raw_message.startswith("{"):
                    message = json.loads(raw_message)
                if message.get("type") == "disconnect":
                    self.connect_to_new_endpoint(force=True)
                else:

                    def _run_message_listeners():
                        self.run_message_listeners(message, raw_message)

                    self.message_workers.submit(_run_message_listeners)
        except Empty:
            pass

    def run_message_listeners(self, message: dict, raw_message: str) -> None:
        type, envelope_id = message.get("type"), message.get("envelope_id")
        if self.logger.level <= logging.DEBUG:
            self.logger.debug(f"Message processing started (type: {type}, envelope_id: {envelope_id})")
        try:
            # just in case, adding the same logic to reconnect here
            if message.get("type") == "disconnect":
                self.connect_to_new_endpoint(force=True)
                return

            for listener in self.message_listeners:
                try:
                    listener(self, message, raw_message)  # type: ignore
                except Exception as e:
                    self.logger.exception(f"Failed to run a message listener: {e}")

            if len(self.socket_mode_request_listeners) > 0:
                request = SocketModeRequest.from_dict(message)
                if request is not None:
                    for listener in self.socket_mode_request_listeners:
                        try:
                            listener(self, request)  # type: ignore
                        except Exception as e:
                            self.logger.exception(f"Failed to run a request listener: {e}")
        except Exception as e:
            self.logger.exception(f"Failed to run message listeners: {e}")
        finally:
            if self.logger.level <= logging.DEBUG:
                self.logger.debug(f"Message processing completed (type: {type}, envelope_id: {envelope_id})")

    def process_messages(self) -> None:
        while not self.closed:
            try:
                self.process_message()
            except Exception as e:
                self.logger.exception(f"Failed to process a message: {e}")

Subclasses

Class variables

var app_token : str
var closed : bool
var connect_operation_lock
var logger : logging.Logger
var message_listeners : List[Union[WebSocketMessageListener, Callable[[BaseSocketModeClient, dict, Optional[str]], None]]]
var message_processorIntervalRunner
var message_queue : queue.Queue
var message_workers : concurrent.futures.thread.ThreadPoolExecutor
var socket_mode_request_listeners : List[Union[SocketModeRequestListener, Callable[[BaseSocketModeClientSocketModeRequest], None]]]
var web_clientWebClient
var wss_uri : str

Methods

def close(self) ‑> None
def connect(self) ‑> None
def connect_to_new_endpoint(self, force: bool = False)
def disconnect(self) ‑> None
def enqueue_message(self, message: str)
def is_connected(self) ‑> bool
def issue_new_wss_url(self) ‑> str
def process_message(self)
def process_messages(self) ‑> None
def run_message_listeners(self, message: dict, raw_message: str) ‑> None
def send_message(self, message: str) ‑> None
def send_socket_mode_response(self, response: Union[Dict[str, Any], SocketModeResponse]) ‑> None