Module slack_bolt.middleware.assistant.async_assistant

Classes

class AsyncAssistant (*,
app_name: str = 'assistant',
thread_context_store: AsyncAssistantThreadContextStore | None = None,
logger: logging.Logger | None = None)
Expand source code
class AsyncAssistant(AsyncMiddleware):
    _thread_started_listeners: Optional[List[AsyncListener]]
    _user_message_listeners: Optional[List[AsyncListener]]
    _bot_message_listeners: Optional[List[AsyncListener]]
    _thread_context_changed_listeners: Optional[List[AsyncListener]]

    thread_context_store: Optional[AsyncAssistantThreadContextStore]
    base_logger: Optional[logging.Logger]

    def __init__(
        self,
        *,
        app_name: str = "assistant",
        thread_context_store: Optional[AsyncAssistantThreadContextStore] = None,
        logger: Optional[logging.Logger] = None,
    ):
        self.app_name = app_name
        self.thread_context_store = thread_context_store
        self.base_logger = logger

        self._thread_started_listeners = None
        self._thread_context_changed_listeners = None
        self._user_message_listeners = None
        self._bot_message_listeners = None

    def thread_started(
        self,
        *args,
        matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
        middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
        lazy: Optional[List[Callable[..., None]]] = None,
    ):
        if self._thread_started_listeners is None:
            self._thread_started_listeners = []
        all_matchers = self._merge_matchers(
            build_listener_matcher(
                func=is_assistant_thread_started_event,
                asyncio=True,
                base_logger=self.base_logger,
            ),  # type:ignore[arg-type]
            matchers,
        )
        if is_used_without_argument(args):
            func = args[0]
            self._thread_started_listeners.append(
                self.build_listener(
                    listener_or_functions=func,
                    matchers=all_matchers,
                    middleware=middleware,  # type:ignore[arg-type]
                )
            )
            return func

        def _inner(func):
            functions = [func] + (lazy if lazy is not None else [])
            self._thread_started_listeners.append(
                self.build_listener(
                    listener_or_functions=functions,
                    matchers=all_matchers,
                    middleware=middleware,
                )
            )

            @wraps(func)
            def _wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return _wrapper

        return _inner

    def user_message(
        self,
        *args,
        matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
        middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
        lazy: Optional[List[Callable[..., None]]] = None,
    ):
        if self._user_message_listeners is None:
            self._user_message_listeners = []
        all_matchers = self._merge_matchers(
            build_listener_matcher(
                func=is_user_message_event_in_assistant_thread,
                asyncio=True,
                base_logger=self.base_logger,
            ),  # type:ignore[arg-type]
            matchers,
        )
        if is_used_without_argument(args):
            func = args[0]
            self._user_message_listeners.append(
                self.build_listener(
                    listener_or_functions=func,
                    matchers=all_matchers,
                    middleware=middleware,  # type:ignore[arg-type]
                )
            )
            return func

        def _inner(func):
            functions = [func] + (lazy if lazy is not None else [])
            self._user_message_listeners.append(
                self.build_listener(
                    listener_or_functions=functions,
                    matchers=all_matchers,
                    middleware=middleware,
                )
            )

            @wraps(func)
            def _wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return _wrapper

        return _inner

    def bot_message(
        self,
        *args,
        matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
        middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
        lazy: Optional[List[Callable[..., None]]] = None,
    ):
        if self._bot_message_listeners is None:
            self._bot_message_listeners = []
        all_matchers = self._merge_matchers(
            build_listener_matcher(
                func=is_bot_message_event_in_assistant_thread,
                asyncio=True,
                base_logger=self.base_logger,
            ),  # type:ignore[arg-type]
            matchers,
        )
        if is_used_without_argument(args):
            func = args[0]
            self._bot_message_listeners.append(
                self.build_listener(
                    listener_or_functions=func,
                    matchers=all_matchers,
                    middleware=middleware,  # type:ignore[arg-type]
                )
            )
            return func

        def _inner(func):
            functions = [func] + (lazy if lazy is not None else [])
            self._bot_message_listeners.append(
                self.build_listener(
                    listener_or_functions=functions,
                    matchers=all_matchers,
                    middleware=middleware,
                )
            )

            @wraps(func)
            def _wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return _wrapper

        return _inner

    def thread_context_changed(
        self,
        *args,
        matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
        middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
        lazy: Optional[List[Callable[..., None]]] = None,
    ):
        if self._thread_context_changed_listeners is None:
            self._thread_context_changed_listeners = []
        all_matchers = self._merge_matchers(
            build_listener_matcher(
                func=is_assistant_thread_context_changed_event,
                asyncio=True,
                base_logger=self.base_logger,
            ),  # type:ignore[arg-type]
            matchers,
        )
        if is_used_without_argument(args):
            func = args[0]
            self._thread_context_changed_listeners.append(
                self.build_listener(
                    listener_or_functions=func,
                    matchers=all_matchers,
                    middleware=middleware,  # type:ignore[arg-type]
                )
            )
            return func

        def _inner(func):
            functions = [func] + (lazy if lazy is not None else [])
            self._thread_context_changed_listeners.append(
                self.build_listener(
                    listener_or_functions=functions,
                    matchers=all_matchers,
                    middleware=middleware,
                )
            )

            @wraps(func)
            def _wrapper(*args, **kwargs):
                return func(*args, **kwargs)

            return _wrapper

        return _inner

    @staticmethod
    def _merge_matchers(
        primary_matcher: Union[Callable[..., bool], AsyncListenerMatcher],
        custom_matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]],
    ):
        return [primary_matcher] + (custom_matchers or [])  # type:ignore[operator]

    @staticmethod
    async def default_thread_context_changed(save_thread_context: AsyncSaveThreadContext, payload: dict):
        new_context: dict = payload["assistant_thread"]["context"]
        await save_thread_context(new_context)

    async def async_process(  # type:ignore[return]
        self,
        *,
        req: AsyncBoltRequest,
        resp: BoltResponse,
        next: Callable[[], Awaitable[BoltResponse]],
    ) -> Optional[BoltResponse]:
        if self._thread_context_changed_listeners is None:
            self.thread_context_changed(self.default_thread_context_changed)

        listener_runner: AsyncioListenerRunner = req.context.listener_runner
        for listeners in [
            self._thread_started_listeners,
            self._thread_context_changed_listeners,
            self._user_message_listeners,
            self._bot_message_listeners,
        ]:
            if listeners is not None:
                for listener in listeners:
                    if listener is not None and await listener.async_matches(req=req, resp=resp):
                        return await listener_runner.run(
                            request=req,
                            response=resp,
                            listener_name="assistant_listener",
                            listener=listener,
                        )
        if is_other_message_sub_event_in_assistant_thread(req.body):
            # message_changed, message_deleted, etc.
            return await req.context.ack()

        await next()

    def build_listener(
        self,
        listener_or_functions: Union[AsyncListener, Callable, List[Callable]],
        matchers: Optional[List[Union[AsyncListenerMatcher, Callable[..., Awaitable[bool]]]]] = None,
        middleware: Optional[List[AsyncMiddleware]] = None,
        base_logger: Optional[Logger] = None,
    ) -> AsyncListener:
        if isinstance(listener_or_functions, Callable):  # type:ignore[arg-type]
            listener_or_functions = [listener_or_functions]  # type:ignore[list-item]

        if isinstance(listener_or_functions, AsyncListener):
            return listener_or_functions
        elif isinstance(listener_or_functions, list):
            middleware = middleware if middleware else []
            functions = listener_or_functions
            ack_function = functions.pop(0)

            matchers = matchers if matchers else []
            listener_matchers: List[AsyncListenerMatcher] = []
            for matcher in matchers:
                if isinstance(matcher, AsyncListenerMatcher):
                    listener_matchers.append(matcher)
                else:
                    listener_matchers.append(
                        build_listener_matcher(
                            func=matcher,  # type:ignore[arg-type]
                            asyncio=True,
                            base_logger=base_logger,
                        )
                    )
            return AsyncCustomListener(
                app_name=self.app_name,
                matchers=listener_matchers,
                middleware=middleware,
                ack_function=ack_function,
                lazy_functions=functions,
                auto_acknowledgement=True,
                base_logger=base_logger or self.base_logger,
            )
        else:
            raise BoltError(f"Invalid listener: {type(listener_or_functions)} detected")

A middleware can process request data before other middleware and listener functions.

Ancestors

Class variables

var base_logger : logging.Logger | None

The type of the None singleton.

var thread_context_storeAsyncAssistantThreadContextStore | None

The type of the None singleton.

Static methods

async def default_thread_context_changed(save_thread_context: AsyncSaveThreadContext,
payload: dict)
Expand source code
@staticmethod
async def default_thread_context_changed(save_thread_context: AsyncSaveThreadContext, payload: dict):
    new_context: dict = payload["assistant_thread"]["context"]
    await save_thread_context(new_context)

Methods

def bot_message(self,
*args,
matchers: Callable[..., bool] | AsyncListenerMatcher | None = None,
middleware: Callable | AsyncMiddleware | None = None,
lazy: List[Callable[..., None]] | None = None)
Expand source code
def bot_message(
    self,
    *args,
    matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
    middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
    lazy: Optional[List[Callable[..., None]]] = None,
):
    if self._bot_message_listeners is None:
        self._bot_message_listeners = []
    all_matchers = self._merge_matchers(
        build_listener_matcher(
            func=is_bot_message_event_in_assistant_thread,
            asyncio=True,
            base_logger=self.base_logger,
        ),  # type:ignore[arg-type]
        matchers,
    )
    if is_used_without_argument(args):
        func = args[0]
        self._bot_message_listeners.append(
            self.build_listener(
                listener_or_functions=func,
                matchers=all_matchers,
                middleware=middleware,  # type:ignore[arg-type]
            )
        )
        return func

    def _inner(func):
        functions = [func] + (lazy if lazy is not None else [])
        self._bot_message_listeners.append(
            self.build_listener(
                listener_or_functions=functions,
                matchers=all_matchers,
                middleware=middleware,
            )
        )

        @wraps(func)
        def _wrapper(*args, **kwargs):
            return func(*args, **kwargs)

        return _wrapper

    return _inner
def build_listener(self,
listener_or_functions: AsyncListener | Callable | List[Callable],
matchers: List[AsyncListenerMatcher | Callable[..., Awaitable[bool]]] | None = None,
middleware: List[AsyncMiddleware] | None = None,
base_logger: logging.Logger | None = None) ‑> AsyncListener
Expand source code
def build_listener(
    self,
    listener_or_functions: Union[AsyncListener, Callable, List[Callable]],
    matchers: Optional[List[Union[AsyncListenerMatcher, Callable[..., Awaitable[bool]]]]] = None,
    middleware: Optional[List[AsyncMiddleware]] = None,
    base_logger: Optional[Logger] = None,
) -> AsyncListener:
    if isinstance(listener_or_functions, Callable):  # type:ignore[arg-type]
        listener_or_functions = [listener_or_functions]  # type:ignore[list-item]

    if isinstance(listener_or_functions, AsyncListener):
        return listener_or_functions
    elif isinstance(listener_or_functions, list):
        middleware = middleware if middleware else []
        functions = listener_or_functions
        ack_function = functions.pop(0)

        matchers = matchers if matchers else []
        listener_matchers: List[AsyncListenerMatcher] = []
        for matcher in matchers:
            if isinstance(matcher, AsyncListenerMatcher):
                listener_matchers.append(matcher)
            else:
                listener_matchers.append(
                    build_listener_matcher(
                        func=matcher,  # type:ignore[arg-type]
                        asyncio=True,
                        base_logger=base_logger,
                    )
                )
        return AsyncCustomListener(
            app_name=self.app_name,
            matchers=listener_matchers,
            middleware=middleware,
            ack_function=ack_function,
            lazy_functions=functions,
            auto_acknowledgement=True,
            base_logger=base_logger or self.base_logger,
        )
    else:
        raise BoltError(f"Invalid listener: {type(listener_or_functions)} detected")
def thread_context_changed(self,
*args,
matchers: Callable[..., bool] | AsyncListenerMatcher | None = None,
middleware: Callable | AsyncMiddleware | None = None,
lazy: List[Callable[..., None]] | None = None)
Expand source code
def thread_context_changed(
    self,
    *args,
    matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
    middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
    lazy: Optional[List[Callable[..., None]]] = None,
):
    if self._thread_context_changed_listeners is None:
        self._thread_context_changed_listeners = []
    all_matchers = self._merge_matchers(
        build_listener_matcher(
            func=is_assistant_thread_context_changed_event,
            asyncio=True,
            base_logger=self.base_logger,
        ),  # type:ignore[arg-type]
        matchers,
    )
    if is_used_without_argument(args):
        func = args[0]
        self._thread_context_changed_listeners.append(
            self.build_listener(
                listener_or_functions=func,
                matchers=all_matchers,
                middleware=middleware,  # type:ignore[arg-type]
            )
        )
        return func

    def _inner(func):
        functions = [func] + (lazy if lazy is not None else [])
        self._thread_context_changed_listeners.append(
            self.build_listener(
                listener_or_functions=functions,
                matchers=all_matchers,
                middleware=middleware,
            )
        )

        @wraps(func)
        def _wrapper(*args, **kwargs):
            return func(*args, **kwargs)

        return _wrapper

    return _inner
def thread_started(self,
*args,
matchers: Callable[..., bool] | AsyncListenerMatcher | None = None,
middleware: Callable | AsyncMiddleware | None = None,
lazy: List[Callable[..., None]] | None = None)
Expand source code
def thread_started(
    self,
    *args,
    matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
    middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
    lazy: Optional[List[Callable[..., None]]] = None,
):
    if self._thread_started_listeners is None:
        self._thread_started_listeners = []
    all_matchers = self._merge_matchers(
        build_listener_matcher(
            func=is_assistant_thread_started_event,
            asyncio=True,
            base_logger=self.base_logger,
        ),  # type:ignore[arg-type]
        matchers,
    )
    if is_used_without_argument(args):
        func = args[0]
        self._thread_started_listeners.append(
            self.build_listener(
                listener_or_functions=func,
                matchers=all_matchers,
                middleware=middleware,  # type:ignore[arg-type]
            )
        )
        return func

    def _inner(func):
        functions = [func] + (lazy if lazy is not None else [])
        self._thread_started_listeners.append(
            self.build_listener(
                listener_or_functions=functions,
                matchers=all_matchers,
                middleware=middleware,
            )
        )

        @wraps(func)
        def _wrapper(*args, **kwargs):
            return func(*args, **kwargs)

        return _wrapper

    return _inner
def user_message(self,
*args,
matchers: Callable[..., bool] | AsyncListenerMatcher | None = None,
middleware: Callable | AsyncMiddleware | None = None,
lazy: List[Callable[..., None]] | None = None)
Expand source code
def user_message(
    self,
    *args,
    matchers: Optional[Union[Callable[..., bool], AsyncListenerMatcher]] = None,
    middleware: Optional[Union[Callable, AsyncMiddleware]] = None,
    lazy: Optional[List[Callable[..., None]]] = None,
):
    if self._user_message_listeners is None:
        self._user_message_listeners = []
    all_matchers = self._merge_matchers(
        build_listener_matcher(
            func=is_user_message_event_in_assistant_thread,
            asyncio=True,
            base_logger=self.base_logger,
        ),  # type:ignore[arg-type]
        matchers,
    )
    if is_used_without_argument(args):
        func = args[0]
        self._user_message_listeners.append(
            self.build_listener(
                listener_or_functions=func,
                matchers=all_matchers,
                middleware=middleware,  # type:ignore[arg-type]
            )
        )
        return func

    def _inner(func):
        functions = [func] + (lazy if lazy is not None else [])
        self._user_message_listeners.append(
            self.build_listener(
                listener_or_functions=functions,
                matchers=all_matchers,
                middleware=middleware,
            )
        )

        @wraps(func)
        def _wrapper(*args, **kwargs):
            return func(*args, **kwargs)

        return _wrapper

    return _inner

Inherited members