Module slack_sdk.scim.async_client
Classes
class AsyncSCIMClient (token: str, timeout: int = 30, ssl: Optional[ssl.SSLContext] = None, proxy: Optional[str] = None, base_url: str = 'https://api.slack.com/scim/v1/', session: Optional[aiohttp.client.ClientSession] = None, trust_env_in_session: bool = False, auth: Optional[aiohttp.helpers.BasicAuth] = None, default_headers: Optional[Dict[str, str]] = None, user_agent_prefix: Optional[str] = None, user_agent_suffix: Optional[str] = None, logger: Optional[logging.Logger] = None, retry_handlers: Optional[List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]] = None)
-
API client for SCIM API See https://api.slack.com/scim for more details
Args
token
- An admin user's token, which starts with
xoxp-
timeout
- Request timeout (in seconds)
ssl
ssl.SSLContext
to use for requestsproxy
- Proxy URL (e.g.,
localhost:9000
,http://localhost:9000
) base_url
- The base URL for API calls
session
aiohttp.ClientSession
instancetrust_env_in_session
- True/False for
aiohttp.ClientSession
auth
- Basic auth info for
aiohttp.ClientSession
default_headers
- Request headers to add to all requests
user_agent_prefix
- Prefix for User-Agent header value
user_agent_suffix
- Suffix for User-Agent header value
logger
- Custom logger
retry_handlers
- Retry handlers
Expand source code
class AsyncSCIMClient: BASE_URL = "https://api.slack.com/scim/v1/" token: str timeout: int ssl: Optional[SSLContext] proxy: Optional[str] base_url: str session: Optional[ClientSession] trust_env_in_session: bool auth: Optional[BasicAuth] default_headers: Dict[str, str] logger: logging.Logger retry_handlers: List[AsyncRetryHandler] def __init__( self, token: str, timeout: int = 30, ssl: Optional[SSLContext] = None, proxy: Optional[str] = None, base_url: str = BASE_URL, session: Optional[ClientSession] = None, trust_env_in_session: bool = False, auth: Optional[BasicAuth] = None, default_headers: Optional[Dict[str, str]] = None, user_agent_prefix: Optional[str] = None, user_agent_suffix: Optional[str] = None, logger: Optional[logging.Logger] = None, retry_handlers: Optional[List[AsyncRetryHandler]] = None, ): """API client for SCIM API See https://api.slack.com/scim for more details Args: token: An admin user's token, which starts with `xoxp-` timeout: Request timeout (in seconds) ssl: `ssl.SSLContext` to use for requests proxy: Proxy URL (e.g., `localhost:9000`, `http://localhost:9000`) base_url: The base URL for API calls session: `aiohttp.ClientSession` instance trust_env_in_session: True/False for `aiohttp.ClientSession` auth: Basic auth info for `aiohttp.ClientSession` default_headers: Request headers to add to all requests user_agent_prefix: Prefix for User-Agent header value user_agent_suffix: Suffix for User-Agent header value logger: Custom logger retry_handlers: Retry handlers """ self.token = token self.timeout = timeout self.ssl = ssl self.proxy = proxy self.base_url = base_url self.session = session self.trust_env_in_session = trust_env_in_session self.auth = auth self.default_headers = default_headers if default_headers else {} self.default_headers["User-Agent"] = get_user_agent(user_agent_prefix, user_agent_suffix) self.logger = logger if logger is not None else logging.getLogger(__name__) self.retry_handlers = retry_handlers if retry_handlers is not None else async_default_handlers() if self.proxy is None or len(self.proxy.strip()) == 0: env_variable = load_http_proxy_from_env(self.logger) if env_variable is not None: self.proxy = env_variable # ------------------------- # Users # ------------------------- async def search_users( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchUsersResponse: return SearchUsersResponse( await self.api_call( http_verb="GET", path="Users", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) ) async def read_user(self, id: str) -> ReadUserResponse: return ReadUserResponse(await self.api_call(http_verb="GET", path=f"Users/{quote(id)}")) async def create_user(self, user: Union[Dict[str, Any], User]) -> UserCreateResponse: return UserCreateResponse( await self.api_call( http_verb="POST", path="Users", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) ) async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) -> UserPatchResponse: return UserPatchResponse( await self.api_call( http_verb="PATCH", path=f"Users/{quote(id)}", body_params=partial_user.to_dict() if isinstance(partial_user, User) else _to_dict_without_not_given(partial_user), ) ) async def update_user(self, user: Union[Dict[str, Any], User]) -> UserUpdateResponse: user_id = user.id if isinstance(user, User) else user["id"] return UserUpdateResponse( await self.api_call( http_verb="PUT", path=f"Users/{quote(user_id)}", body_params=user.to_dict() if isinstance(user, User) else _to_dict_without_not_given(user), ) ) async def delete_user(self, id: str) -> UserDeleteResponse: return UserDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Users/{quote(id)}", ) ) # ------------------------- # Groups # ------------------------- async def search_groups( self, *, # Pagination required as of August 30, 2019. count: int, start_index: int, filter: Optional[str] = None, ) -> SearchGroupsResponse: return SearchGroupsResponse( await self.api_call( http_verb="GET", path="Groups", query_params={ "filter": filter, "count": count, "startIndex": start_index, }, ) ) async def read_group(self, id: str) -> ReadGroupResponse: return ReadGroupResponse(await self.api_call(http_verb="GET", path=f"Groups/{quote(id)}")) async def create_group(self, group: Union[Dict[str, Any], Group]) -> GroupCreateResponse: return GroupCreateResponse( await self.api_call( http_verb="POST", path="Groups", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) ) async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) -> GroupPatchResponse: return GroupPatchResponse( await self.api_call( http_verb="PATCH", path=f"Groups/{quote(id)}", body_params=partial_group.to_dict() if isinstance(partial_group, Group) else _to_dict_without_not_given(partial_group), ) ) async def update_group(self, group: Union[Dict[str, Any], Group]) -> GroupUpdateResponse: group_id = group.id if isinstance(group, Group) else group["id"] return GroupUpdateResponse( await self.api_call( http_verb="PUT", path=f"Groups/{quote(group_id)}", body_params=group.to_dict() if isinstance(group, Group) else _to_dict_without_not_given(group), ) ) async def delete_group(self, id: str) -> GroupDeleteResponse: return GroupDeleteResponse( await self.api_call( http_verb="DELETE", path=f"Groups/{quote(id)}", ) ) # ------------------------- async def api_call( self, *, http_verb: str, path: str, query_params: Optional[Dict[str, Any]] = None, body_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, ) -> SCIMResponse: url = f"{self.base_url}{path}" query = _build_query(query_params) if len(query) > 0: url += f"?{query}" return await self._perform_http_request( http_verb=http_verb, url=url, body_params=body_params, headers=_build_request_headers( token=self.token, default_headers=self.default_headers, additional_headers=headers, ), ) async def _perform_http_request( self, *, http_verb: str, url: str, body_params: Optional[Dict[str, Any]], headers: Dict[str, str], ) -> SCIMResponse: if body_params is not None: if body_params.get("schemas") is None: body_params["schemas"] = ["urn:scim:schemas:core:1.0"] body_params = json.dumps(body_params) headers["Content-Type"] = "application/json;charset=utf-8" session: Optional[ClientSession] = None use_running_session = self.session and not self.session.closed if use_running_session: session = self.session else: session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout), auth=self.auth, trust_env=self.trust_env_in_session, ) last_error: Optional[Exception] = None resp: Optional[SCIMResponse] = None try: request_kwargs = { "headers": headers, "data": body_params, "ssl": self.ssl, "proxy": self.proxy, } retry_request = RetryHttpRequest( method=http_verb, url=url, headers=headers, body_params=body_params, ) retry_state = RetryState() counter_for_safety = 0 while counter_for_safety < 100: counter_for_safety += 1 # If this is a retry, the next try started here. We can reset the flag. retry_state.next_attempt_requested = False retry_response: Optional[RetryHttpResponse] = None response_body = "" if self.logger.level <= logging.DEBUG: headers_for_logging = { k: "(redacted)" if k.lower() == "authorization" else v for k, v in headers.items() } self.logger.debug( f"Sending a request - url: {url}, params: {body_params}, headers: {headers_for_logging}" ) try: async with session.request(http_verb, url, **request_kwargs) as res: try: response_body = await res.text() retry_response = RetryHttpResponse( status_code=res.status, headers=res.headers, data=response_body.encode("utf-8") if response_body is not None else None, ) except aiohttp.ContentTypeError: self.logger.debug(f"No response data returned from the following API call: {url}.") retry_response = RetryHttpResponse( status_code=res.status, headers=res.headers, ) if res.status == 429: for handler in self.retry_handlers: if await handler.can_retry_async( state=retry_state, request=retry_request, response=retry_response, ): if self.logger.level <= logging.DEBUG: self.logger.info( f"A retry handler found: {type(handler).__name__} " f"for {http_verb} {url} - rate_limited" ) await handler.prepare_for_next_attempt_async( state=retry_state, request=retry_request, response=retry_response, ) break if retry_state.next_attempt_requested is False: resp = SCIMResponse( url=url, status_code=res.status, raw_body=response_body, headers=res.headers, ) _debug_log_response(self.logger, resp) return resp except Exception as e: last_error = e for handler in self.retry_handlers: if await handler.can_retry_async( state=retry_state, request=retry_request, response=retry_response, error=e, ): if self.logger.level <= logging.DEBUG: self.logger.info( f"A retry handler found: {type(handler).__name__} " f"for {http_verb} {url} - {e}" ) await handler.prepare_for_next_attempt_async( state=retry_state, request=retry_request, response=retry_response, error=e, ) break if retry_state.next_attempt_requested is False: raise last_error if resp is not None: return resp raise last_error finally: if not use_running_session: await session.close() return resp
Class variables
var BASE_URL
var auth : Optional[aiohttp.helpers.BasicAuth]
var base_url : str
var default_headers : Dict[str, str]
var logger : logging.Logger
var proxy : Optional[str]
var retry_handlers : List[slack_sdk.http_retry.async_handler.AsyncRetryHandler]
var session : Optional[aiohttp.client.ClientSession]
var ssl : Optional[ssl.SSLContext]
var timeout : int
var token : str
var trust_env_in_session : bool
Methods
async def api_call(self, *, http_verb: str, path: str, query_params: Optional[Dict[str, Any]] = None, body_params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None) ‑> SCIMResponse
async def create_group(self, group: Union[Dict[str, Any], Group]) ‑> GroupCreateResponse
async def create_user(self, user: Union[Dict[str, Any], User]) ‑> UserCreateResponse
async def delete_group(self, id: str) ‑> GroupDeleteResponse
async def delete_user(self, id: str) ‑> UserDeleteResponse
async def patch_group(self, id: str, partial_group: Union[Dict[str, Any], Group]) ‑> GroupPatchResponse
async def patch_user(self, id: str, partial_user: Union[Dict[str, Any], User]) ‑> UserPatchResponse
async def read_group(self, id: str) ‑> ReadGroupResponse
async def read_user(self, id: str) ‑> ReadUserResponse
async def search_groups(self, *, count: int, start_index: int, filter: Optional[str] = None) ‑> SearchGroupsResponse
async def search_users(self, *, count: int, start_index: int, filter: Optional[str] = None) ‑> SearchUsersResponse
async def update_group(self, group: Union[Dict[str, Any], Group]) ‑> GroupUpdateResponse
async def update_user(self, user: Union[Dict[str, Any], User]) ‑> UserUpdateResponse