Expand source code
class ThreadListenerRunner:
logger: Logger
process_before_response: bool
listener_error_handler: ListenerErrorHandler
listener_start_handler: ListenerStartHandler
listener_completion_handler: ListenerCompletionHandler
listener_executor: Executor
lazy_listener_runner: LazyListenerRunner
def __init__(
self,
logger: Logger,
process_before_response: bool,
listener_error_handler: ListenerErrorHandler,
listener_start_handler: ListenerStartHandler,
listener_completion_handler: ListenerCompletionHandler,
listener_executor: Executor,
lazy_listener_runner: LazyListenerRunner,
):
self.logger = logger
self.process_before_response = process_before_response
self.listener_error_handler = listener_error_handler
self.listener_start_handler = listener_start_handler
self.listener_completion_handler = listener_completion_handler
self.listener_executor = listener_executor
self.lazy_listener_runner = lazy_listener_runner
def run(
self,
request: BoltRequest,
response: BoltResponse,
listener_name: str,
listener: Listener,
starting_time: Optional[float] = None,
) -> Optional[BoltResponse]:
ack = request.context.ack
starting_time = starting_time if starting_time is not None else time.time()
if self.process_before_response:
if not request.lazy_only:
try:
self.listener_start_handler.handle(
request=request,
response=response,
)
returned_value = listener.run_ack_function(request=request, response=response)
if isinstance(returned_value, BoltResponse):
response = returned_value
if ack.response is None and listener.auto_acknowledgement:
ack() # automatic ack() call if the call is not yet done
except Exception as e:
# The default response status code is 500 in this case.
# You can customize this by passing your own error handler.
if response is None:
response = BoltResponse(status=500)
response.status = 500
self.listener_error_handler.handle(
error=e,
request=request,
response=response,
)
ack.response = response
finally:
self.listener_completion_handler.handle(
request=request,
response=response,
)
for lazy_func in listener.lazy_functions:
if request.lazy_function_name:
func_name = get_name_for_callable(lazy_func)
if func_name == request.lazy_function_name:
self.lazy_listener_runner.run(function=lazy_func, request=request)
# This HTTP response won't be sent to Slack API servers.
return BoltResponse(status=200)
else:
continue
else:
self._start_lazy_function(lazy_func, request)
if response is not None:
self._debug_log_completion(starting_time, response)
return response
elif ack.response is not None:
self._debug_log_completion(starting_time, ack.response)
return ack.response
else:
if listener.auto_acknowledgement:
# acknowledge immediately in case of Events API
ack()
if not request.lazy_only:
# start the listener function asynchronously
def run_ack_function_asynchronously():
nonlocal ack, request, response
try:
self.listener_start_handler.handle(
request=request,
response=response,
)
listener.run_ack_function(request=request, response=response)
except Exception as e:
# The default response status code is 500 in this case.
# You can customize this by passing your own error handler.
if listener.auto_acknowledgement:
self.listener_error_handler.handle(
error=e,
request=request,
response=response,
)
else:
if response is None:
response = BoltResponse(status=500)
response.status = 500
if ack.response is not None: # already acknowledged
response = None
self.listener_error_handler.handle(
error=e,
request=request,
response=response,
)
ack.response = response
finally:
self.listener_completion_handler.handle(
request=request,
response=response,
)
self.listener_executor.submit(run_ack_function_asynchronously)
for lazy_func in listener.lazy_functions:
if request.lazy_function_name:
func_name = get_name_for_callable(lazy_func)
if func_name == request.lazy_function_name:
self.lazy_listener_runner.run(function=lazy_func, request=request)
# This HTTP response won't be sent to Slack API servers.
return BoltResponse(status=200)
else:
continue
else:
self._start_lazy_function(lazy_func, request)
# await for the completion of ack() in the async listener execution
while ack.response is None and time.time() - starting_time <= 3:
time.sleep(0.01)
if response is None and ack.response is None:
self.logger.warning(warning_did_not_call_ack(listener_name))
return None
if response is None and ack.response is not None:
response = ack.response
self._debug_log_completion(starting_time, response)
return response
if response is not None:
return response
# None for both means no ack() in the listener
return None
def _start_lazy_function(self, lazy_func: Callable[..., None], request: BoltRequest) -> None:
# Start a lazy function asynchronously
func_name: str = get_name_for_callable(lazy_func)
self.logger.debug(debug_running_lazy_listener(func_name))
copied_request = self._build_lazy_request(request, func_name)
self.lazy_listener_runner.start(function=lazy_func, request=copied_request)
def _build_lazy_request(self, request: BoltRequest, lazy_func_name: str) -> BoltRequest:
copied_request: BoltRequest = create_copy(request.to_copyable())
copied_request.lazy_only = True
copied_request.lazy_function_name = lazy_func_name
# These are not copyable objects, so manually set for a different thread
copied_request.context["listener_runner"] = self
if request.context.get_thread_context is not None:
copied_request.context["get_thread_context"] = request.context.get_thread_context
if request.context.save_thread_context is not None:
copied_request.context["save_thread_context"] = request.context.save_thread_context
return copied_request
def _debug_log_completion(self, starting_time: float, response: BoltResponse) -> None:
millis = int((time.time() - starting_time) * 1000)
self.logger.debug(debug_responding(response.status, response.body, millis))