Module slack_sdk.socket_mode.websockets
websockets bassd Socket Mode client
Classes
class SocketModeClient (app_token: str, logger: Optional[logging.Logger] = None, web_client: Optional[AsyncWebClient] = None, auto_reconnect_enabled: bool = True, ping_interval: float = 10, trace_enabled: bool = False)
-
Socket Mode client
Args
app_token
- App-level token
logger
- Custom logger
web_client
- Web API client
auto_reconnect_enabled
- True if automatic reconnection is enabled (default: True)
ping_interval
- interval for ping-pong with Slack servers (seconds)
trace_enabled
- True if more verbose logs to see what's happening under the hood
Expand source code
class SocketModeClient(AsyncBaseSocketModeClient): logger: Logger web_client: AsyncWebClient app_token: str wss_uri: Optional[str] auto_reconnect_enabled: bool message_queue: Queue message_listeners: List[ Union[ AsyncWebSocketMessageListener, Callable[["AsyncBaseSocketModeClient", dict, Optional[str]], Awaitable[None]], ] ] socket_mode_request_listeners: List[ Union[ AsyncSocketModeRequestListener, Callable[["AsyncBaseSocketModeClient", SocketModeRequest], Awaitable[None]], ] ] message_receiver: Optional[Future] message_processor: Future ping_interval: float trace_enabled: bool current_session: Optional[WebSocketClientProtocol] current_session_monitor: Optional[Future] auto_reconnect_enabled: bool default_auto_reconnect_enabled: bool closed: bool connect_operation_lock: Lock def __init__( self, app_token: str, logger: Optional[Logger] = None, web_client: Optional[AsyncWebClient] = None, auto_reconnect_enabled: bool = True, ping_interval: float = 10, trace_enabled: bool = False, ): """Socket Mode client Args: app_token: App-level token logger: Custom logger web_client: Web API client auto_reconnect_enabled: True if automatic reconnection is enabled (default: True) ping_interval: interval for ping-pong with Slack servers (seconds) trace_enabled: True if more verbose logs to see what's happening under the hood """ self.app_token = app_token self.logger = logger or logging.getLogger(__name__) self.web_client = web_client or AsyncWebClient() self.closed = False self.connect_operation_lock = Lock() self.default_auto_reconnect_enabled = auto_reconnect_enabled self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.ping_interval = ping_interval self.trace_enabled = trace_enabled self.wss_uri = None self.message_queue = Queue() self.message_listeners = [] self.socket_mode_request_listeners = [] self.current_session = None self.current_session_monitor = None self.message_receiver = None self.message_processor = asyncio.ensure_future(self.process_messages()) async def monitor_current_session(self) -> None: # In the asyncio runtime, accessing a shared object (self.current_session here) from # multiple tasks can cause race conditions and errors. # To avoid such, we access only the session that is active when this loop starts. session: WebSocketClientProtocol = self.current_session session_id: str = await self.session_id() if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new monitor_current_session() execution loop for {session_id} started") try: while not self.closed: if session != self.current_session: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The monitor_current_session task for {session_id} is now cancelled") break await asyncio.sleep(self.ping_interval) try: if self.auto_reconnect_enabled and (session is None or session.closed): self.logger.info(f"The session ({session_id}) seems to be already closed. Reconnecting...") await self.connect_to_new_endpoint() except Exception as e: self.logger.error( "Failed to check the current session or reconnect to the server " f"(error: {type(e).__name__}, message: {e}, session: {session_id})" ) except asyncio.CancelledError: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The monitor_current_session task for {session_id} is now cancelled") raise async def receive_messages(self) -> None: # In the asyncio runtime, accessing a shared object (self.current_session here) from # multiple tasks can cause race conditions and errors. # To avoid such, we access only the session that is active when this loop starts. session: WebSocketClientProtocol = self.current_session session_id: str = await self.session_id() consecutive_error_count = 0 if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new receive_messages() execution loop with {session_id} started") try: while not self.closed: if session != self.current_session: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The running receive_messages task for {session_id} is now cancelled") break try: message = await session.recv() if message is not None: if isinstance(message, bytes): message = message.decode("utf-8") if self.logger.level <= logging.DEBUG: self.logger.debug( f"Received message: {debug_redacted_message_string(message)}, session: {session_id}" ) await self.enqueue_message(message) consecutive_error_count = 0 except Exception as e: consecutive_error_count += 1 self.logger.error( f"Failed to receive or enqueue a message: {type(e).__name__}, error: {e}, session: {session_id}" ) if isinstance(e, websockets.ConnectionClosedError): await asyncio.sleep(self.ping_interval) else: await asyncio.sleep(consecutive_error_count) except asyncio.CancelledError: if self.logger.level <= logging.DEBUG: self.logger.debug(f"The running receive_messages task for {session_id} is now cancelled") raise async def is_connected(self) -> bool: return not self.closed and self.current_session is not None and not self.current_session.closed async def session_id(self) -> str: return self.build_session_id(self.current_session) async def connect(self): if self.wss_uri is None: self.wss_uri = await self.issue_new_wss_url() old_session: Optional[WebSocketClientProtocol] = None if self.current_session is None else self.current_session # NOTE: websockets does not support proxy settings self.current_session = await websockets.connect( uri=self.wss_uri, ping_interval=self.ping_interval, ) session_id = await self.session_id() self.auto_reconnect_enabled = self.default_auto_reconnect_enabled self.logger.info(f"A new session ({session_id}) has been established") if self.current_session_monitor is not None: self.current_session_monitor.cancel() self.current_session_monitor = asyncio.ensure_future(self.monitor_current_session()) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new monitor_current_session() executor has been recreated for {session_id}") if self.message_receiver is not None: self.message_receiver.cancel() self.message_receiver = asyncio.ensure_future(self.receive_messages()) if self.logger.level <= logging.DEBUG: self.logger.debug(f"A new receive_messages() executor has been recreated for {session_id}") if old_session is not None: await old_session.close() old_session_id = self.build_session_id(old_session) self.logger.info(f"The old session ({old_session_id}) has been abandoned") async def disconnect(self): if self.current_session is not None: await self.current_session.close() async def send_message(self, message: str): session = self.current_session session_id = self.build_session_id(session) if self.logger.level <= logging.DEBUG: self.logger.debug(f"Sending a message: {message}, session: {session_id}") try: await session.send(message) except WebSocketException as e: # We rarely get this exception while replacing the underlying WebSocket connections. # We can do one more try here as the self.current_session should be ready now. if self.logger.level <= logging.DEBUG: self.logger.debug( f"Failed to send a message (error: {e}, message: {message}, session: {session_id})" " as the underlying connection was replaced. Retrying the same request only one time..." ) # Although acquiring self.connect_operation_lock also for the first method call is the safest way, # we avoid synchronizing a lot for better performance. That's why we are doing a retry here. try: if await self.is_connected(): await self.current_session.send(message) else: self.logger.warning(f"The current session ({session_id}) is no longer active. Failed to send a message") raise e finally: if self.connect_operation_lock.locked() is True: self.connect_operation_lock.release() async def close(self): self.closed = True self.auto_reconnect_enabled = False await self.disconnect() self.message_processor.cancel() if self.current_session_monitor is not None: self.current_session_monitor.cancel() if self.message_receiver is not None: self.message_receiver.cancel() @classmethod def build_session_id(cls, session: WebSocketClientProtocol) -> str: if session is None: return "" return "s_" + str(hash(session))
Ancestors
Class variables
var app_token : str
var auto_reconnect_enabled : bool
var closed : bool
var connect_operation_lock : asyncio.locks.Lock
var current_session : Optional[websockets.legacy.client.WebSocketClientProtocol]
var current_session_monitor : Optional[_asyncio.Future]
var default_auto_reconnect_enabled : bool
var logger : logging.Logger
var message_listeners : List[Union[AsyncWebSocketMessageListener, Callable[[AsyncBaseSocketModeClient, dict, Optional[str]], Awaitable[None]]]]
var message_processor : _asyncio.Future
var message_queue : asyncio.queues.Queue
var message_receiver : Optional[_asyncio.Future]
var ping_interval : float
var socket_mode_request_listeners : List[Union[AsyncSocketModeRequestListener, Callable[[AsyncBaseSocketModeClient, SocketModeRequest], Awaitable[None]]]]
var trace_enabled : bool
var web_client : AsyncWebClient
var wss_uri : Optional[str]
Static methods
def build_session_id(session: websockets.legacy.client.WebSocketClientProtocol) ‑> str
Methods
async def close(self)
async def connect(self)
async def disconnect(self)
async def is_connected(self) ‑> bool
async def monitor_current_session(self) ‑> None
async def receive_messages(self) ‑> None
async def send_message(self, message: str)
async def session_id(self) ‑> str