From 8bf0de0c07968a15b19402a79ea4ee7cde3ab6c3 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Wed, 27 Aug 2025 16:18:42 +0200 Subject: [PATCH 1/7] Add Processor base class for one-off computation --- src/herostools/actor/processor.py | 118 ++++++++++++++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 src/herostools/actor/processor.py diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py new file mode 100644 index 0000000..9e5c634 --- /dev/null +++ b/src/herostools/actor/processor.py @@ -0,0 +1,118 @@ +from concurrent.futures import ProcessPoolExecutor, Future, CancelledError +from typing import Any, Callable, List +from abc import abstractmethod + +from heros.event import event +from heros.helper import log + +# shared executor +_executor = ProcessPoolExecutor() + + +class Processor: + def __init__(self, auto_start: bool = False, *args, **kwargs): + # store inputs on the instance + self.default_args = args + self.default_kwargs = kwargs + # keep track of callbacks for reference + self._callbacks: List[Callable[[Future], Any]] = [] + # submit the computation + self._future: Future | None = None + # broadcast data after computation + self.register_done_callback(self.new_data) + if auto_start: + self.start() + + @event + def new_data(self, fut: Future) -> dict: + """ + Event to broadcast the result to the heros realm. + + Args: + fut: The future returned from the computation. + """ + try: + result = fut.result() + except Exception as e: + log.error(f"Callback error: {e}") + return result if isinstance(result, dict) else {} + + def register_done_callback(self, func: Callable[[Future], Any]) -> None: + """ + Attach a callback (called when computation finishes). + The callback receives the Future. + + Args: + func: The callback. + """ + self._callbacks.append(func) + + def start(self, *args, **kwargs) -> None: + """ + Start the _run method in a using the process pool. + + Args: + *args, **kwargs contain the args and kwargs passed from the class and from :meth:`start`. + """ + full_args = [*args, *self.default_args] + full_kwargs = {**kwargs, **self.default_kwargs} + # submit the computation + self._future = _executor.submit(self._run, *full_args, **full_kwargs) + # attach all callbacks + for cb in self._callbacks: + self._future.add_done_callback(cb) + + def done(self) -> bool: + """ + Check if computation finished. + + Returns: + True if :meth:`run` is done, False otherwise. + """ + if self._future is None: + return False + return self._future.done() + + def get_result(self, timeout: float | None = None) -> Any: + """ + Optionally block until result is available. + + Args: + timeout: Timeout during waiting for a result. + """ + if self._future is None: + return None + try: + result = self._future.result(timeout=timeout) + except (TimeoutError, CancelledError) as e: + log.error(e) + result = None + return result + + @staticmethod + @abstractmethod + def _run(*args, **kwargs): + """ + Actual computation. + Runs in a worker process, so it only has access to picklable data (args/kwargs). + Subclasses must override this. + """ + raise NotImplementedError("Subclasses must implement _run()!") + + +class DummyProcessor(Processor): + @staticmethod + def _run(message_from_the_instance: str, *args, **kwargs): + """ + Actual computation. + + Args: + message_from_the_instance: A nice message which should be printed to the log. + *args, **kwargs contain the args and kwargs passed from the class and from :meth:`start`. + """ + import time + + log.error(message_from_the_instance) + time.sleep(10) + log.error("I'll be back!") + return {"dummy_result": (3.14, "MHz")} -- GitLab From 07137b6b0b41baa2d8e7cffdadc59170e30b6525 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Thu, 28 Aug 2025 09:59:45 +0200 Subject: [PATCH 2/7] Only allow kwargs in start() --- src/herostools/actor/processor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index 9e5c634..76e16aa 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -47,14 +47,14 @@ class Processor: """ self._callbacks.append(func) - def start(self, *args, **kwargs) -> None: + def start(self, kwargs: dict = {}) -> None: """ Start the _run method in a using the process pool. Args: - *args, **kwargs contain the args and kwargs passed from the class and from :meth:`start`. + kwargs: Keyword arguments to be passed to the worker. """ - full_args = [*args, *self.default_args] + full_args = [*self.default_args] full_kwargs = {**kwargs, **self.default_kwargs} # submit the computation self._future = _executor.submit(self._run, *full_args, **full_kwargs) -- GitLab From 198e130cc7b6a99242bb0925ecff45295bc9c467 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Thu, 28 Aug 2025 11:26:04 +0200 Subject: [PATCH 3/7] Return data as DatasourceReturnSet --- src/herostools/actor/processor.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index 76e16aa..a647c13 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -2,6 +2,7 @@ from concurrent.futures import ProcessPoolExecutor, Future, CancelledError from typing import Any, Callable, List from abc import abstractmethod +from heros import DatasourceReturnSet from heros.event import event from heros.helper import log @@ -24,7 +25,7 @@ class Processor: self.start() @event - def new_data(self, fut: Future) -> dict: + def new_data(self, fut: Future) -> DatasourceReturnSet: """ Event to broadcast the result to the heros realm. @@ -33,9 +34,9 @@ class Processor: """ try: result = fut.result() + return DatasourceReturnSet.from_data(result) except Exception as e: log.error(f"Callback error: {e}") - return result if isinstance(result, dict) else {} def register_done_callback(self, func: Callable[[Future], Any]) -> None: """ -- GitLab From ee4d8d737d67a836ee325f10ca2a2bfc0541d14f Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Wed, 3 Sep 2025 16:01:36 +0200 Subject: [PATCH 4/7] Add external processpool to Processor --- src/herostools/actor/processor.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index a647c13..6d0708b 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -6,15 +6,14 @@ from heros import DatasourceReturnSet from heros.event import event from heros.helper import log -# shared executor -_executor = ProcessPoolExecutor() - class Processor: - def __init__(self, auto_start: bool = False, *args, **kwargs): + def __init__(self, auto_start: bool = False, pool: ProcessPoolExecutor | None = None, *args, **kwargs): # store inputs on the instance self.default_args = args self.default_kwargs = kwargs + # check if a pool was provided + self._pool = ProcessPoolExecutor() if pool is None else pool # keep track of callbacks for reference self._callbacks: List[Callable[[Future], Any]] = [] # submit the computation @@ -58,7 +57,7 @@ class Processor: full_args = [*self.default_args] full_kwargs = {**kwargs, **self.default_kwargs} # submit the computation - self._future = _executor.submit(self._run, *full_args, **full_kwargs) + self._future = self._pool.submit(self._run, *full_args, **full_kwargs) # attach all callbacks for cb in self._callbacks: self._future.add_done_callback(cb) -- GitLab From 11ec5d3a48903ef6dc8103e1c4ed9e4c5b6dd951 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Wed, 3 Sep 2025 16:20:54 +0200 Subject: [PATCH 5/7] Add external ProcessPoolExecutor --- examples/processor.json | 8 ++++++ src/herostools/actor/processor.py | 41 ++++++++++++++++++++++++++++--- 2 files changed, 46 insertions(+), 3 deletions(-) create mode 100644 examples/processor.json diff --git a/examples/processor.json b/examples/processor.json new file mode 100644 index 0000000..a6bc66a --- /dev/null +++ b/examples/processor.json @@ -0,0 +1,8 @@ +{ + "_id": "dummy_processor", + "classname": "herostools.actor.processor.DummyProcessor", + "arguments": { + "message_from_the_instance": "foooo bar baz", + "pool": "@_boss_pool" + } +} \ No newline at end of file diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index 6d0708b..dfdf558 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -4,10 +4,26 @@ from abc import abstractmethod from heros import DatasourceReturnSet from heros.event import event -from heros.helper import log + +from boss.helper import log class Processor: + """ + Wrapper to start a task inside a separate process. + + Examples could be long running analysis tasks or computations. The tasks have to be defined in the + :meth:`_run` of child classes. Their return value is then passed to :meth:`new_data`. If it can be + passed into a :code:`DatasourceReturnSet`, it is then published via the event. + + Args: + auto_start: Whether to automatically start the processor. + pool: Optional process pool executor to use. If not provided, a new one will be created. + To use the :code:`ProcessPoolExecutor` within :code:`BOSS`, pass the magic keyword :code:`@_boss_loop`. + *args: Positional arguments to pass to the computation. + **kwargs: Keyword arguments to pass to the computation. + """ + def __init__(self, auto_start: bool = False, pool: ProcessPoolExecutor | None = None, *args, **kwargs): # store inputs on the instance self.default_args = args @@ -60,7 +76,13 @@ class Processor: self._future = self._pool.submit(self._run, *full_args, **full_kwargs) # attach all callbacks for cb in self._callbacks: - self._future.add_done_callback(cb) + try: + log.debug(f"Executing callback {cb}") + self._future.add_done_callback(cb) + except Exception as e: + log.error(f"Callback {cb} error: {e}") + # attach cleanup + self._future.add_done_callback(self._cleanup_future) def done(self) -> bool: """ @@ -89,6 +111,19 @@ class Processor: result = None return result + def _cleanup_future(self, fut: Future) -> None: + """ + Internal callback to break references after computation finishes. + + Args: + fut: The future of :meth:`_run`. + """ + log.debug("Cleaning up future") + # Clear stored callbacks + self._callbacks.clear() + # Drop reference to the future + self._future = None + @staticmethod @abstractmethod def _run(*args, **kwargs): @@ -113,6 +148,6 @@ class DummyProcessor(Processor): import time log.error(message_from_the_instance) - time.sleep(10) + time.sleep(1) log.error("I'll be back!") return {"dummy_result": (3.14, "MHz")} -- GitLab From a7cc8b7fd05765d1656b5a62244d01e1c1140208 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Wed, 3 Sep 2025 16:55:20 +0200 Subject: [PATCH 6/7] Make sure cleanup runs after computation --- src/herostools/actor/processor.py | 42 +++++++++++++++---------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index dfdf558..cc2c27d 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -40,23 +40,22 @@ class Processor: self.start() @event - def new_data(self, fut: Future) -> DatasourceReturnSet: + def new_data(self, payload) -> DatasourceReturnSet: """ Event to broadcast the result to the heros realm. Args: - fut: The future returned from the computation. + payload: The result returned from the computation. """ try: - result = fut.result() - return DatasourceReturnSet.from_data(result) + return DatasourceReturnSet.from_data(payload) except Exception as e: log.error(f"Callback error: {e}") - def register_done_callback(self, func: Callable[[Future], Any]) -> None: + def register_done_callback(self, func: Callable[[Any], Any]) -> None: """ Attach a callback (called when computation finishes). - The callback receives the Future. + The callback receives the result of the future. Args: func: The callback. @@ -74,15 +73,8 @@ class Processor: full_kwargs = {**kwargs, **self.default_kwargs} # submit the computation self._future = self._pool.submit(self._run, *full_args, **full_kwargs) - # attach all callbacks - for cb in self._callbacks: - try: - log.debug(f"Executing callback {cb}") - self._future.add_done_callback(cb) - except Exception as e: - log.error(f"Callback {cb} error: {e}") - # attach cleanup - self._future.add_done_callback(self._cleanup_future) + # attach a single wrapper that handles user callbacks + cleanup + self._future.add_done_callback(self._run_callbacks_and_cleanup) def done(self) -> bool: """ @@ -111,17 +103,25 @@ class Processor: result = None return result - def _cleanup_future(self, fut: Future) -> None: + def _run_callbacks_and_cleanup(self, fut: Future) -> None: """ - Internal callback to break references after computation finishes. + Internal callback to run user callbacks. + Finally, it breaks references for garbage collection. Args: fut: The future of :meth:`_run`. """ - log.debug("Cleaning up future") - # Clear stored callbacks - self._callbacks.clear() - # Drop reference to the future + # get the result + payload = fut.result() + # run user callbacks in the order they were added + for cb in self._callbacks: + try: + log.debug(f"Executing callback {cb}") + cb(payload) + except Exception as e: + log.error(f"Callback {cb} error: {e}") + # cleanup + log.debug("Cleaning up") self._future = None @staticmethod -- GitLab From 6a700ab8584ddb8de6ccb40a89574b10efa87235 Mon Sep 17 00:00:00 2001 From: Suthep Pomjaksilp Date: Thu, 4 Sep 2025 09:53:41 +0200 Subject: [PATCH 7/7] Add try/catch for getting the result --- src/herostools/actor/processor.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/herostools/actor/processor.py b/src/herostools/actor/processor.py index cc2c27d..da0efe9 100644 --- a/src/herostools/actor/processor.py +++ b/src/herostools/actor/processor.py @@ -112,17 +112,21 @@ class Processor: fut: The future of :meth:`_run`. """ # get the result - payload = fut.result() - # run user callbacks in the order they were added - for cb in self._callbacks: - try: - log.debug(f"Executing callback {cb}") - cb(payload) - except Exception as e: - log.error(f"Callback {cb} error: {e}") + try: + payload = fut.result() + # run user callbacks in the order they were added + for cb in self._callbacks: + try: + log.debug(f"Executing callback {cb}") + cb(payload) + except Exception as e: + log.error(f"Callback {cb} error: {e}") + except Exception as e: + log.error(f"Could not get result from future: {e}") # cleanup - log.debug("Cleaning up") - self._future = None + finally: + log.debug("Cleaning up") + self._future = None @staticmethod @abstractmethod -- GitLab