Module slack_sdk.socket_mode.builtin.connection
Classes
class Connection (url: str,
logger: logging.Logger,
proxy: str | None = None,
proxy_headers: Dict[str, str] | None = None,
ping_interval: float = 5,
receive_timeout: float = 3,
receive_buffer_size: int = 1024,
trace_enabled: bool = False,
all_message_trace_enabled: bool = False,
ping_pong_trace_enabled: bool = False,
on_message_listener: Callable[[str], None] | None = None,
on_error_listener: Callable[[Exception], None] | None = None,
on_close_listener: Callable[[int, str | None], None] | None = None,
connection_type_name: str = 'Socket Mode',
ssl_context: ssl.SSLContext | None = None)-
Expand source code
class Connection: url: str logger: Logger proxy: Optional[str] proxy_headers: Optional[Dict[str, str]] trace_enabled: bool ping_pong_trace_enabled: bool last_ping_pong_time: Optional[float] session_id: str sock: Optional[ssl.SSLSocket] on_message_listener: Optional[Callable[[str], None]] on_error_listener: Optional[Callable[[Exception], None]] on_close_listener: Optional[Callable[[int, Optional[str]], None]] def __init__( self, url: str, logger: Logger, proxy: Optional[str] = None, proxy_headers: Optional[Dict[str, str]] = None, ping_interval: float = 5, # seconds receive_timeout: float = 3, receive_buffer_size: int = 1024, trace_enabled: bool = False, all_message_trace_enabled: bool = False, ping_pong_trace_enabled: bool = False, on_message_listener: Optional[Callable[[str], None]] = None, on_error_listener: Optional[Callable[[Exception], None]] = None, on_close_listener: Optional[Callable[[int, Optional[str]], None]] = None, connection_type_name: str = "Socket Mode", ssl_context: Optional[ssl.SSLContext] = None, ): self.url = url self.logger = logger self.proxy = proxy self.proxy_headers = proxy_headers self.ping_interval = ping_interval self.receive_timeout = receive_timeout self.receive_buffer_size = receive_buffer_size if self.receive_buffer_size < 16: raise SlackClientConfigurationError("Too small receive_buffer_size detected.") self.session_id = str(uuid4()) self.trace_enabled = trace_enabled self.all_message_trace_enabled = all_message_trace_enabled self.ping_pong_trace_enabled = ping_pong_trace_enabled self.last_ping_pong_time = None self.consecutive_check_state_error_count = 0 self.sock = None # To avoid ssl.SSLError: [SSL: BAD_LENGTH] bad length self.sock_receive_lock = Lock() self.sock_send_lock = Lock() self.on_message_listener = on_message_listener self.on_error_listener = on_error_listener self.on_close_listener = on_close_listener self.connection_type_name = connection_type_name self.ssl_context = ssl_context def connect(self) -> None: try: parsed_url = urlparse(self.url.strip()) hostname: str = parsed_url.hostname port: int = parsed_url.port or (443 if parsed_url.scheme == "wss" else 80) if self.trace_enabled: self.logger.debug( f"Connecting to the address for handshake: {hostname}:{port} " f"(session id: {self.session_id})" ) sock: Union[ssl.SSLSocket, socket] = _establish_new_socket_connection( # type: ignore session_id=self.session_id, server_hostname=hostname, server_port=port, logger=self.logger, sock_send_lock=self.sock_send_lock, receive_timeout=self.receive_timeout, proxy=self.proxy, proxy_headers=self.proxy_headers, trace_enabled=self.trace_enabled, ssl_context=self.ssl_context, ) # WebSocket handshake try: path = f"{parsed_url.path}?{parsed_url.query}" sec_websocket_key = _generate_sec_websocket_key() message = f"""GET {path} HTTP/1.1 Host: {parsed_url.hostname} Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: {sec_websocket_key} Sec-WebSocket-Version: 13 """ req: str = "\r\n".join([line.lstrip() for line in message.split("\n")]) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake request (session id: {self.session_id}):\n{req}" ) with self.sock_send_lock: sock.send(req.encode("utf-8")) status, headers, text = _parse_handshake_response(sock) if self.trace_enabled: self.logger.debug( f"{self.connection_type_name} handshake response (session id: {self.session_id}):\n{text}" ) # HTTP/1.1 101 Switching Protocols if status == 101: if not _validate_sec_websocket_accept(sec_websocket_key, headers): raise SlackClientNotConnectedError( f"Invalid response header detected in {self.connection_type_name} handshake response" f" (session id: {self.session_id})" ) # set this successfully connected socket self.sock = sock self.ping(f"{self.session_id}:{time.time()}") else: message = ( f"Received an unexpected response for handshake " f"(status: {status}, response: {text}, session id: {self.session_id})" ) self.logger.warning(message) except socket.error as e: code: Optional[int] = None if e.args and len(e.args) > 1 and isinstance(e.args[0], int): code = e.args[0] if code is not None: error_message = f"Error code: {code} (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) raise except Exception as e: error_message = f"Failed to establish a connection (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) if self.on_error_listener is not None: self.on_error_listener(e) self.disconnect() def disconnect(self) -> None: if self.sock is not None: with self.sock_send_lock: with self.sock_receive_lock: # Synchronize before closing this instance's socket self.sock.close() self.sock = None # After this, all operations using self.sock will be skipped self.logger.info(f"The connection has been closed (session id: {self.session_id})") def is_active(self) -> bool: return self.sock is not None def close(self) -> None: self.disconnect() def ping(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a ping data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PING) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a ping message as the underlying socket is no longer available.") def pong(self, payload: Union[str, bytes] = "") -> None: if self.trace_enabled and self.ping_pong_trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a pong data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_PONG) with self.sock_send_lock: if self.sock is not None: self.sock.send(data) else: if self.ping_pong_trace_enabled: self.logger.debug("Skipped sending a pong message as the underlying socket is no longer available.") def send(self, payload: str) -> None: if self.trace_enabled: if isinstance(payload, bytes): payload = payload.decode("utf-8") self.logger.debug("Sending a text data frame " f"(session id: {self.session_id}, payload: {payload})") data = _build_data_frame_for_sending(payload, FrameHeader.OPCODE_TEXT) with self.sock_send_lock: try: self.sock.send(data) except Exception as e: # In most cases, we want to retry this operation with a newly established connection. # Getting this exception means that this connection has been replaced with a new one # and it's no longer usable. # The SocketModeClient implementation can do one retry when it gets this exception. raise SlackClientNotConnectedError( f"Failed to send a message as the connection is no longer active " f"(session_id: {self.session_id}, error: {e})" ) def check_state(self) -> None: try: if self.sock is not None: try: self.ping(f"{self.session_id}:{time.time()}") except ssl.SSLZeroReturnError as e: self.logger.info( "Unable to send a ping message. Closing the connection..." f" (session id: {self.session_id}, reason: {e})" ) self.disconnect() return if self.last_ping_pong_time is not None: disconnected_seconds = int(time.time() - self.last_ping_pong_time) if self.trace_enabled and disconnected_seconds > self.ping_interval: message = ( f"{disconnected_seconds} seconds have passed " f"since this client last received a pong response from the server " f"(session id: {self.session_id})" ) self.logger.debug(message) is_stale = disconnected_seconds > self.ping_interval * 4 if is_stale: self.logger.info( "The connection seems to be stale. Disconnecting..." f" (session id: {self.session_id}," f" reason: disconnected for {disconnected_seconds}+ seconds)" ) self.disconnect() return else: self.logger.debug("This connection is already closed." f" (session id: {self.session_id})") self.consecutive_check_state_error_count = 0 except Exception as e: error_message = ( "Failed to check the state of sock " f"(session id: {self.session_id}, error: {type(e).__name__}, message: {e})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) self.consecutive_check_state_error_count += 1 if self.consecutive_check_state_error_count >= 5: self.disconnect() def run_until_completion(self, state: ConnectionState) -> None: repeated_messages = {"payload": 0} ping_count = 0 pong_count = 0 ping_pong_log_summary_size = 1000 while not state.terminated: try: if self.is_active(): received_messages: List[Tuple[Optional[FrameHeader], bytes]] = _receive_messages( sock=self.sock, sock_receive_lock=self.sock_receive_lock, logger=self.logger, receive_buffer_size=self.receive_buffer_size, all_message_trace_enabled=self.all_message_trace_enabled, ) for message in received_messages: header, data = message # ----------------- # trace logging if self.trace_enabled is True: opcode: str = _to_readable_opcode(header.opcode) if header else "-" payload: str = _parse_text_payload(data, self.logger) count: Optional[int] = repeated_messages.get(payload) if count is None: count = 1 else: count += 1 repeated_messages = {payload: count} if not self.ping_pong_trace_enabled and header is not None and header.opcode is not None: if header.opcode == FrameHeader.OPCODE_PING: ping_count += 1 if ping_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} ping data frame " f"(session id: {self.session_id})" ) ping_count = 0 if header.opcode == FrameHeader.OPCODE_PONG: pong_count += 1 if pong_count % ping_pong_log_summary_size == 0: self.logger.debug( f"Received {ping_pong_log_summary_size} pong data frame " f"(session id: {self.session_id})" ) pong_count = 0 ping_pong_to_skip = ( header is not None and header.opcode is not None and (header.opcode == FrameHeader.OPCODE_PING or header.opcode == FrameHeader.OPCODE_PONG) and not self.ping_pong_trace_enabled ) if not ping_pong_to_skip and count < 5: # if so many same payloads came in, the trace logging should be skipped. # e.g., after receiving "UNAUTHENTICATED: cache_error", many "opcode: -, payload: " self.logger.debug( "Received a new data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) if header is None: # Skip no header message continue # ----------------- # message with opcode if header.opcode == FrameHeader.OPCODE_PING: self.pong(data) elif header.opcode == FrameHeader.OPCODE_PONG: str_message = data.decode("utf-8") elements = str_message.split(":") if len(elements) >= 2: session_id, ping_time = elements[0], elements[1] if self.session_id == session_id: try: self.last_ping_pong_time = float(ping_time) except Exception as e: self.logger.debug( "Failed to parse a pong message " f" (message: {str_message}, error: {e}" ) elif header.opcode == FrameHeader.OPCODE_TEXT: if self.on_message_listener is not None: text = data.decode("utf-8") self.on_message_listener(text) elif header.opcode == FrameHeader.OPCODE_CLOSE: if self.on_close_listener is not None: if len(data) >= 2: (code,) = struct.unpack("!H", data[:2]) reason = data[2:].decode("utf-8") self.on_close_listener(code, reason) else: self.on_close_listener(1005, "") self.disconnect() state.terminated = True else: # Just warn logging opcode = _to_readable_opcode(header.opcode) if header else "-" payload: Union[bytes, str] = data if header.opcode != FrameHeader.OPCODE_BINARY: try: payload = data.decode("utf-8") if data is not None else "" except Exception as e: self.logger.info(f"Failed to convert the data to text {e}") message = ( "Received an unsupported data frame " f"(session id: {self.session_id}, opcode: {opcode}, payload: {payload})" ) self.logger.warning(message) else: time.sleep(0.2) except socket.timeout: time.sleep(0.01) except OSError as e: # getting errno.EBADF and the socket is no longer available if e.errno == 9 and state.terminated: self.logger.debug( "The reason why you got [Errno 9] Bad file descriptor here is " "the socket is no longer available." ) else: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an OSError while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) # As this connection no longer works in any way, terminating it if self.is_active(): try: self.disconnect() except Exception as disconnection_error: error_message = ( "Failed to disconnect" f" (session id: {self.session_id}, error: {disconnection_error})" ) if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True break except Exception as e: if self.on_error_listener is not None: self.on_error_listener(e) else: error_message = "Got an exception while receiving data" f" (session id: {self.session_id}, error: {e})" if self.trace_enabled: self.logger.exception(error_message) else: self.logger.error(error_message) state.terminated = True
Class variables
var last_ping_pong_time : float | None
-
The type of the None singleton.
var logger : logging.Logger
-
The type of the None singleton.
var on_close_listener : Callable[[int, str | None], None] | None
-
The type of the None singleton.
var on_error_listener : Callable[[Exception], None] | None
-
The type of the None singleton.
var on_message_listener : Callable[[str], None] | None
-
The type of the None singleton.
var ping_pong_trace_enabled : bool
-
The type of the None singleton.
var proxy : str | None
-
The type of the None singleton.
var proxy_headers : Dict[str, str] | None
-
The type of the None singleton.
var session_id : str
-
The type of the None singleton.
var sock : ssl.SSLSocket | None
-
The type of the None singleton.
var trace_enabled : bool
-
The type of the None singleton.
var url : str
-
The type of the None singleton.
Methods
def check_state(self) ‑> None
def close(self) ‑> None
def connect(self) ‑> None
def disconnect(self) ‑> None
def is_active(self) ‑> bool
def ping(self, payload: str | bytes = '') ‑> None
def pong(self, payload: str | bytes = '') ‑> None
def run_until_completion(self,
state: ConnectionState) ‑> Nonedef send(self, payload: str) ‑> None
class ConnectionState
-
Expand source code
class ConnectionState: # The flag supposed to be used for telling SocketModeClient # when this connection is no longer available terminated: bool def __init__(self): self.terminated = False
Class variables
var terminated : bool
-
The type of the None singleton.