From d1324cb29605b0a0d9f169cb0b4f31c13cd2d7e3 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 25 Oct 2024 17:47:09 +0200 Subject: [PATCH 01/58] feat: add retry discipline with timeout feature --- .../disciplines/wrappers/retry_discipline.py | 254 ++++++++++++++++++ .../wrappers/test_retry_discipline.py | 153 +++++++++++ 2 files changed, 407 insertions(+) create mode 100644 src/gemseo/disciplines/wrappers/retry_discipline.py create mode 100644 tests/disciplines/wrappers/test_retry_discipline.py diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py new file mode 100644 index 0000000000..efe2e802b9 --- /dev/null +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -0,0 +1,254 @@ +# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 3 as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +"""Retry discipline execution, with a possible timeout passed to the discipline.""" + +from __future__ import annotations + +import multiprocessing +import time +import traceback +from logging import getLogger +from timeit import default_timer as timer + +from gemseo.core.discipline import Discipline + +LOGGER = getLogger(__name__) + + +class DisciplineTimeOutError(Exception): + """Error when timeout execution is reached.""" + + +class RetryDiscipline(Discipline): + """A discipline that retries execution. + + with a possible timeout to stop the execution. + """ + + def __init__( + self, + discipline: Discipline, + n_retry=5, + wait_time=1, + disc_timeout: float | None = None, + abort_exceptions: tuple[Exception] | None = None, + ) -> None: + """ + Args: + discipline: The discipline to wrap in the retry loop + + n_retry: max number of retry + + wait_time: time to wait before 2 retries (in seconds) + + disc_timeout : Time limit in seconds the discipline can run, when exceeded + the execution is aborted. + + abort_exceptions: exceptions for which the code raises an exception + + Raises: + OSError: If ``job_template_path`` does not exist. + """ # noqa:D205 D212 D415 + # super().__init__(discipline.name, grammar_type=discipline.grammar_type) + super().__init__(discipline.name) + self._discipline = discipline + self.input_grammar = discipline.input_grammar + self.output_grammar = discipline.output_grammar + self.default_input_data = discipline.default_input_data + self.n_retry = n_retry + self._wait_time = wait_time + self.disc_timeout = disc_timeout + self._abort_exceptions = abort_exceptions + + self.n_executions = 0 + """Number of performed executions of the discipline.""" + + self.process_results = None + """Results when the discipline is run using multiprocessing.""" + + def _run(self) -> None: + current_error = None + self.n_executions = 0 + + stop = True + for n in range(1, self.n_retry + 1): + stop = True + self.n_executions += 1 + + try: + if self.disc_timeout is None: + """Classic execution, no timeout set.""" + self._discipline.execute(self.local_data) + else: + """disc_timeout is set.""" + try: + self._execute_discipline_with_timeout(self.local_data) + except Exception: # noqa: TRY203 + raise + + except DisciplineTimeOutError: + """Try to re-launch the discipline as timeout is reached.""" + time.sleep(self._wait_time) + continue + + except Exception as err: + if self._abort_exceptions is not None and isinstance( + err, self._abort_exceptions + ): + msg = ( + "Failed to execute discipline %s, " + "aborting retry because of the exception type %s.", + (self._discipline.name, type(err)), + ) + LOGGER.exception(msg) + raise + msg = ( + "Catched error while executing discipline" + f" {self._discipline.name} on attempt number {n}, retrying.", + ) + LOGGER.exception(msg) + current_error = err + stop = False + if stop: + break + time.sleep(self._wait_time) + if not stop and current_error is not None: + LOGGER.error( + "Failed to execute discipline %s after %s attempts.", + self._discipline.name, + self.n_retry, + ) + raise current_error + + if self.disc_timeout is None: + self.local_data.update(self._discipline.get_output_data()) + else: + if n < self.n_retry: + self.local_data.update(self.process_results) + else: + msg = "Error, timeout was reached at each try." + raise DisciplineTimeOutError(msg) + + def _execute_discipline_with_timeout(self, input_data): + """Run the discipline with a timeout. + + disc_timeout is set to n seconds + """ + # prepare inputs to be sent to process: + # - inputs (= discipline to execute + inputs (self.local_data)) + # - target_func : function run by the process, function that itself + # encloses the inputs (discipline + self.local_data) + inputs = {"disc": self._discipline, "inputs": input_data} + target_func = self._disc_execution + + # Initiate the queue and the process + queue = multiprocessing.Queue() + queue.put(inputs) + p = MProcess(target=target_func, args=(queue,)) + + t_0 = timer() + p.start() + + # wait for the program completes before the timeout. + p.join(self.disc_timeout) + + if p.exception: + """Raise exception if execution failed (outside timeout case).""" + error, _traceback = p.exception + raise error + + is_proc_alive = p.is_alive() + if is_proc_alive: + """Runtime exceeds timeout, killing process and its children.""" + process_ident = p.ident + LOGGER.debug("Killing process: %s", process_ident) + + t_f = timer() + elapsed_time = t_f - t_0 + LOGGER.debug("Elapsed time: %s", elapsed_time) + + # kill all active children + list_child = multiprocessing.active_children() + for child in list_child: + LOGGER.debug("killing %s", child) + child.kill() + + # Closing queue (mandatory in order program continues, otherwise + # program waits for ending of the process launched + queue.close() + + self.process_results = None + + err_msg = "Process stopped as it exceeds timeout (%s s)", self.disc_timeout + raise DisciplineTimeOutError(err_msg) + + # retrieve data through the queue + if queue.empty(): + self.process_results = None + else: + self.process_results = queue.get() + + t_f = timer() + elapsed_time = t_f - t_0 + LOGGER.debug("Elapsed time in discipline: %s", elapsed_time) + + LOGGER.debug("Results: %s", self.process_results) + LOGGER.info("\n***************** Here *****************:") + LOGGER.info(self.process_results) + + def _disc_execution(self, queue): + """Discipline execution with multiprocessing package.""" + # Retrieve inputs + disc_data = queue.get() + disc = disc_data["disc"] + inputs = disc_data["inputs"] + + # execute discipline, capture error if any + disc.execute(inputs) + exec_results = disc.get_output_data() + + # return the results through the queue.put feature + queue.put(exec_results) + + +class MProcess(multiprocessing.Process): + """Overloading run() method in the multiprocessing.Proccess() class. + + with a try...except statement and setting up a Pipe() to get and store any raised + exceptions from the child process into an instance field for named exception + """ + + def __init__(self, *args, **kwargs): + """Instantiate the class.""" + multiprocessing.Process.__init__(self, *args, **kwargs) + self._pconn, self._cconn = multiprocessing.Pipe() + self._exception = None + + def run(self): + """Overloaded run method called by multiprocessing.Process.""" + try: + multiprocessing.Process.run(self) + self._cconn.send(None) + except Exception as e: # noqa: BLE001 + tb = traceback.format_exc() + self._cconn.send((e, tb)) + # raise e # You can still rise this exception if you need to + + @property + def exception(self): + """Return the exception to be catched by the parent process.""" + if self._pconn.poll(): + self._exception = self._pconn.recv() + return self._exception diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py new file mode 100644 index 0000000000..b3cdbe59eb --- /dev/null +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -0,0 +1,153 @@ +# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 3 as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this program; if not, write to the Free Software Foundation, +# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. +# Contributors: +# INITIAL AUTHORS - initial API and implementation and/or +# initial documentation +# :author: Francois Gallard +# OTHER AUTHORS - MACROSCOPIC CHANGES +"""Tests for retry discipline.""" + +from __future__ import annotations + +from logging import getLogger + +import pytest +from numpy import array + +from gemseo import create_discipline +from gemseo.core.discipline import Discipline +from gemseo.disciplines.wrappers.retry_discipline import DisciplineTimeOutError +from gemseo.disciplines.wrappers.retry_discipline import RetryDiscipline + +LOGGER = getLogger(__name__) + + +def an_analytic_discipline(): + """Analytic discipline.""" + return create_discipline("AnalyticDiscipline", expressions={"y": "2*x+1"}) + + +def a_crashing_analytic_discipline(): + """Analytic discipline crashing when 0=0.""" + return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) + + +class FailingDiscipline(Discipline): + """A failing discipline, no more used""" + + default_grammar_type = Discipline.GrammarType.SIMPLE + + def __init__(self, raise_exception=RuntimeError, stop_raising_after_n_executions=3): + super().__init__(name="failing") + self.input_grammar.update_from_names(["input"]) + self.output_grammar.update_from_names(["output"]) + self._raise_exception = raise_exception + self._stop_raising_after_n_executions = stop_raising_after_n_executions + + self._n_executions = 0 + + def _run(self): + self._n_executions += 1 + if self._n_executions < self._stop_raising_after_n_executions: + raise self._raise_exception() + + self.local_data["output"] = self.local_data["input"] + + +def currently_deactivated_test_basic_failure(): + """Test that the default inputs are independent.""" + discipline = FailingDiscipline() + retry = RetryDiscipline(discipline) + + retry.execute({"input": array([1])}) + + assert discipline._n_executions == 3 + + +def test_retry_discipline(): + """Test discipline, no timeout set.""" + analytic_disc = an_analytic_discipline() + + retry_discipline = RetryDiscipline(analytic_disc) + + inputs = {"x": array([4.0])} + retry_discipline.execute(inputs) + + assert retry_discipline.local_data == {"x": array([4.0]), "y": array([9.0])} + assert retry_discipline.n_executions == 1 + + +def test_retry_discipline_with_timeout(): + """Test discipline with a timeout.""" + analytic_disc = an_analytic_discipline() + timeout = 5 + + disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=timeout) + + inputs = {"x": array([4.0])} + disc_with_timeout.execute(inputs) + + assert disc_with_timeout.local_data == {"x": array([4.0]), "y": array([9.0])} + assert disc_with_timeout.n_executions == 1 + + +def test_failure_retry_discipline_with_timeout(): + """Test failure of the discipline with a too much very short timeout.""" + analytic_disc = an_analytic_discipline() + timeout = 0.0001 + + disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=timeout) + + inputs = {"x": array([4.0])} + + with pytest.raises(DisciplineTimeOutError) as timeout_exec: + disc_with_timeout.execute(inputs) + + assert timeout_exec.typename == "DisciplineTimeOutError" + assert str(timeout_exec.value) == "Error, timeout was reached at each try." + assert disc_with_timeout.n_executions == disc_with_timeout.n_retry + + +def test_failure_zero_division_error(): + """Test failure of the discipline with a bad x entry. + + In order to catch the ZeroDivisionError, set n_retry=1 + """ + analytic_disc = a_crashing_analytic_discipline() + disc = RetryDiscipline(analytic_disc, n_retry=1) + + inputs = {"x": array([0.0])} + with pytest.raises(ZeroDivisionError): + disc.execute(inputs) + + +@pytest.mark.parametrize("n_try", [1, 3]) +def test_failure_zero_division_error_with_timeout(n_try): + """Test failure of the discipline with timeout and a bad x entry. + + In order to catch the ZeroDivisionError that arises before timeout (5s), test with + n_retry=1 and 3 to be sure every case is ok. + """ + analytic_disc = a_crashing_analytic_discipline() + disc = RetryDiscipline(analytic_disc, n_retry=n_try, disc_timeout=5.0) + + inputs = {"x": array([0.0])} + with pytest.raises(ZeroDivisionError) as err: + disc.execute(inputs) + + assert disc.n_executions == disc.n_retry + assert err.type is ZeroDivisionError + assert err.typename == "ZeroDivisionError" + assert str(err.value) == "float division by zero" -- GitLab From ebcb1d1a88f03a18efbd49f748d1e709c2266236 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 8 Nov 2024 15:02:49 +0100 Subject: [PATCH 02/58] fix: added type of variables in __init__ --- src/gemseo/disciplines/wrappers/retry_discipline.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index efe2e802b9..0f57503dc1 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -40,8 +40,8 @@ class RetryDiscipline(Discipline): def __init__( self, discipline: Discipline, - n_retry=5, - wait_time=1, + n_retry: int = 5, + wait_time: float = 1, disc_timeout: float | None = None, abort_exceptions: tuple[Exception] | None = None, ) -> None: @@ -79,10 +79,12 @@ class RetryDiscipline(Discipline): """Results when the discipline is run using multiprocessing.""" def _run(self) -> None: + """Method called by 'execute'.""" current_error = None self.n_executions = 0 stop = True + n = 0 for n in range(1, self.n_retry + 1): stop = True self.n_executions += 1 @@ -186,7 +188,7 @@ class RetryDiscipline(Discipline): child.kill() # Closing queue (mandatory in order program continues, otherwise - # program waits for ending of the process launched + # program waits for ending of the process that has been launched queue.close() self.process_results = None -- GitLab From 5d1721ac103d508b4531c16feb782482ecc7612e Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 8 Nov 2024 17:44:51 +0100 Subject: [PATCH 03/58] docs: fix incorrect term --- src/gemseo/disciplines/wrappers/retry_discipline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 0f57503dc1..7a6ee9e83d 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -167,7 +167,7 @@ class RetryDiscipline(Discipline): p.join(self.disc_timeout) if p.exception: - """Raise exception if execution failed (outside timeout case).""" + """Raise exception if execution failed (apart from timeout case).""" error, _traceback = p.exception raise error @@ -187,8 +187,8 @@ class RetryDiscipline(Discipline): LOGGER.debug("killing %s", child) child.kill() - # Closing queue (mandatory in order program continues, otherwise - # program waits for ending of the process that has been launched + # Closing queue (mandatory in order program continues, otherwise program + # waits for ending of the process that has been launched queue.close() self.process_results = None -- GitLab From 874f434fbf17d78990169e70d8f297496f3e3eb4 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 15 Nov 2024 18:14:48 +0100 Subject: [PATCH 04/58] feat: introducing concurrent.futures in place of multiprocessing package --- .../disciplines/wrappers/retry_discipline.py | 201 +++++++----------- .../wrappers/test_retry_discipline.py | 2 +- 2 files changed, 76 insertions(+), 127 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 7a6ee9e83d..209bd9fe79 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -16,9 +16,11 @@ from __future__ import annotations -import multiprocessing +import concurrent +import os +import signal import time -import traceback +from concurrent.futures import ProcessPoolExecutor from logging import getLogger from timeit import default_timer as timer @@ -43,7 +45,6 @@ class RetryDiscipline(Discipline): n_retry: int = 5, wait_time: float = 1, disc_timeout: float | None = None, - abort_exceptions: tuple[Exception] | None = None, ) -> None: """ Args: @@ -56,12 +57,9 @@ class RetryDiscipline(Discipline): disc_timeout : Time limit in seconds the discipline can run, when exceeded the execution is aborted. - abort_exceptions: exceptions for which the code raises an exception - Raises: - OSError: If ``job_template_path`` does not exist. + TimeoutError: if disc_timeout limit is reached. """ # noqa:D205 D212 D415 - # super().__init__(discipline.name, grammar_type=discipline.grammar_type) super().__init__(discipline.name) self._discipline = discipline self.input_grammar = discipline.input_grammar @@ -70,13 +68,12 @@ class RetryDiscipline(Discipline): self.n_retry = n_retry self._wait_time = wait_time self.disc_timeout = disc_timeout - self._abort_exceptions = abort_exceptions self.n_executions = 0 """Number of performed executions of the discipline.""" self.process_results = None - """Results when the discipline is run using multiprocessing.""" + """Results when the discipline is run.""" def _run(self) -> None: """Method called by 'execute'.""" @@ -90,32 +87,13 @@ class RetryDiscipline(Discipline): self.n_executions += 1 try: - if self.disc_timeout is None: - """Classic execution, no timeout set.""" - self._discipline.execute(self.local_data) - else: - """disc_timeout is set.""" - try: - self._execute_discipline_with_timeout(self.local_data) - except Exception: # noqa: TRY203 - raise - + self._execute_discipline(self.local_data) except DisciplineTimeOutError: """Try to re-launch the discipline as timeout is reached.""" time.sleep(self._wait_time) continue except Exception as err: - if self._abort_exceptions is not None and isinstance( - err, self._abort_exceptions - ): - msg = ( - "Failed to execute discipline %s, " - "aborting retry because of the exception type %s.", - (self._discipline.name, type(err)), - ) - LOGGER.exception(msg) - raise msg = ( "Catched error while executing discipline" f" {self._discipline.name} on attempt number {n}, retrying.", @@ -134,123 +112,94 @@ class RetryDiscipline(Discipline): ) raise current_error - if self.disc_timeout is None: - self.local_data.update(self._discipline.get_output_data()) - else: - if n < self.n_retry: - self.local_data.update(self.process_results) - else: - msg = "Error, timeout was reached at each try." - raise DisciplineTimeOutError(msg) + # if self.disc_timeout is None: + # self.local_data.update(self._discipline.get_output_data()) + # else: + # if n < self.n_retry: + # self.local_data.update(self.process_results) + # else: + # msg = "Error, timeout was reached at each try." + # raise DisciplineTimeOutError(msg) - def _execute_discipline_with_timeout(self, input_data): - """Run the discipline with a timeout. + def _execute_discipline(self, input_data): + """Run the discipline. - disc_timeout is set to n seconds + disc_timeout is set to n seconds, or None """ - # prepare inputs to be sent to process: - # - inputs (= discipline to execute + inputs (self.local_data)) - # - target_func : function run by the process, function that itself - # encloses the inputs (discipline + self.local_data) - inputs = {"disc": self._discipline, "inputs": input_data} - target_func = self._disc_execution - - # Initiate the queue and the process - queue = multiprocessing.Queue() - queue.put(inputs) - p = MProcess(target=target_func, args=(queue,)) + LOGGER.debug("time_out set to : %s s", self.disc_timeout) t_0 = timer() - p.start() - # wait for the program completes before the timeout. - p.join(self.disc_timeout) + with ProcessPoolExecutor() as executor: + run_discipline = executor.submit( + self._disc_execution, + input_data, + ) - if p.exception: - """Raise exception if execution failed (apart from timeout case).""" - error, _traceback = p.exception - raise error + LOGGER.debug("Elapsed time from starting 00: %s", timer() - t_0) - is_proc_alive = p.is_alive() - if is_proc_alive: - """Runtime exceeds timeout, killing process and its children.""" - process_ident = p.ident - LOGGER.debug("Killing process: %s", process_ident) + try: + LOGGER.debug("Running discipline and get the output") - t_f = timer() - elapsed_time = t_f - t_0 - LOGGER.debug("Elapsed time: %s", elapsed_time) + # wait for result with timeout + self.process_results = run_discipline.result(timeout=self.disc_timeout) - # kill all active children - list_child = multiprocessing.active_children() - for child in list_child: - LOGGER.debug("killing %s", child) - child.kill() + # report the result + LOGGER.debug("Results:") + LOGGER.debug(self.process_results) - # Closing queue (mandatory in order program continues, otherwise program - # waits for ending of the process that has been launched - queue.close() + self._discipline.local_data.update(self.process_results) + LOGGER.debug("Discipline outputs:") + data_outputs = self._discipline.get_output_data() + for key in data_outputs: + val = data_outputs[key] + LOGGER.debug(str(f" {key} ({type(val)}): {val}")) - self.process_results = None + LOGGER.debug("Elapsed time from starting 01: %s", timer() - t_0) - err_msg = "Process stopped as it exceeds timeout (%s s)", self.disc_timeout - raise DisciplineTimeOutError(err_msg) + except concurrent.futures.TimeoutError: + """Runtime exceeds timeout, killing process and its children.""" - # retrieve data through the queue - if queue.empty(): - self.process_results = None - else: - self.process_results = queue.get() + LOGGER.debug("TimeoutError: Time out waiting for result()") - t_f = timer() - elapsed_time = t_f - t_0 - LOGGER.debug("Elapsed time in discipline: %s", elapsed_time) + # abort discipline execution immediately: shutdown + kill children + # killing children is mandatory, else the program terminates only + # when timeout is reached + pid_child = [p.pid for p in executor._processes.values()] + executor.shutdown(wait=False, cancel_futures=True) - LOGGER.debug("Results: %s", self.process_results) - LOGGER.info("\n***************** Here *****************:") - LOGGER.info(self.process_results) + LOGGER.debug("killing subprocesses: %s", pid_child) + for pid in pid_child: + os.kill(pid, signal.SIGTERM) - def _disc_execution(self, queue): - """Discipline execution with multiprocessing package.""" - # Retrieve inputs - disc_data = queue.get() - disc = disc_data["disc"] - inputs = disc_data["inputs"] + LOGGER.debug( + "Elapsed time from starting (TimeoutError): %s", timer() - t_0 + ) - # execute discipline, capture error if any - disc.execute(inputs) - exec_results = disc.get_output_data() + err_msg = ( + "Process stopped as it exceeds timeout (%s s)", + self.disc_timeout, + ) + raise DisciplineTimeOutError(err_msg) # noqa: B904 - # return the results through the queue.put feature - queue.put(exec_results) + except Exception as err: # noqa: BLE001 + LOGGER.debug(type(err)) + LOGGER.debug(err) + LOGGER.debug("issue in getting output") + LOGGER.debug( + "Elapsed time from starting (Other Error): %s", timer() - t_0 + ) + raise -class MProcess(multiprocessing.Process): - """Overloading run() method in the multiprocessing.Proccess() class. + t_f = timer() + elapsed_time = t_f - t_0 + LOGGER.debug("Elapsed time in discipline: %s", elapsed_time) - with a try...except statement and setting up a Pipe() to get and store any raised - exceptions from the child process into an instance field for named exception - """ + LOGGER.debug("Results: %s", self.process_results) + LOGGER.debug("\n****** Exiting _execute_discipline *****:") - def __init__(self, *args, **kwargs): - """Instantiate the class.""" - multiprocessing.Process.__init__(self, *args, **kwargs) - self._pconn, self._cconn = multiprocessing.Pipe() - self._exception = None - - def run(self): - """Overloaded run method called by multiprocessing.Process.""" - try: - multiprocessing.Process.run(self) - self._cconn.send(None) - except Exception as e: # noqa: BLE001 - tb = traceback.format_exc() - self._cconn.send((e, tb)) - # raise e # You can still rise this exception if you need to - - @property - def exception(self): - """Return the exception to be catched by the parent process.""" - if self._pconn.poll(): - self._exception = self._pconn.recv() - return self._exception + def _disc_execution(self, input_data): + """Execute discipline.""" + self._discipline.execute(input_data) + return self._discipline.get_output_data() diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index b3cdbe59eb..43ab4873b4 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -40,7 +40,7 @@ def an_analytic_discipline(): def a_crashing_analytic_discipline(): - """Analytic discipline crashing when 0=0.""" + """Analytic discipline crashing when x=0.""" return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) -- GitLab From 492d32c39873788bf801cf7adb8ba51ded8c89a7 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Mon, 18 Nov 2024 10:36:24 +0100 Subject: [PATCH 05/58] fix: add local_data.update in retry_discipline --- src/gemseo/disciplines/wrappers/retry_discipline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 209bd9fe79..7aa76d5146 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -112,6 +112,7 @@ class RetryDiscipline(Discipline): ) raise current_error + self.local_data.update(self._discipline.get_output_data()) # if self.disc_timeout is None: # self.local_data.update(self._discipline.get_output_data()) # else: -- GitLab From b89a5415a9ba8fb4f1ac7ca80daabbe4bbe03c0e Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 19 Nov 2024 16:16:50 +0100 Subject: [PATCH 06/58] fix: issue in testing, modified the discipline and 1 test --- .../disciplines/wrappers/retry_discipline.py | 51 ++++++------------- .../wrappers/test_retry_discipline.py | 48 +++-------------- 2 files changed, 24 insertions(+), 75 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 7aa76d5146..ae2aeefc87 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -16,7 +16,6 @@ from __future__ import annotations -import concurrent import os import signal import time @@ -29,10 +28,6 @@ from gemseo.core.discipline import Discipline LOGGER = getLogger(__name__) -class DisciplineTimeOutError(Exception): - """Error when timeout execution is reached.""" - - class RetryDiscipline(Discipline): """A discipline that retries execution. @@ -72,33 +67,26 @@ class RetryDiscipline(Discipline): self.n_executions = 0 """Number of performed executions of the discipline.""" - self.process_results = None - """Results when the discipline is run.""" - - def _run(self) -> None: + def _run(self, input_data=None) -> None: """Method called by 'execute'.""" current_error = None self.n_executions = 0 stop = True - n = 0 - for n in range(1, self.n_retry + 1): + for _n in range(1, self.n_retry + 1): stop = True self.n_executions += 1 try: - self._execute_discipline(self.local_data) - except DisciplineTimeOutError: + self._execute_discipline(input_data) + except TimeoutError as time_out_error: """Try to re-launch the discipline as timeout is reached.""" + current_error = time_out_error time.sleep(self._wait_time) continue - except Exception as err: - msg = ( - "Catched error while executing discipline" - f" {self._discipline.name} on attempt number {n}, retrying.", - ) - LOGGER.exception(msg) + except Exception as err: # noqa: BLE001 + # LOGGER.exception(msg) current_error = err stop = False if stop: @@ -112,15 +100,8 @@ class RetryDiscipline(Discipline): ) raise current_error + # Transfer output data in RetryDiscipline local data. self.local_data.update(self._discipline.get_output_data()) - # if self.disc_timeout is None: - # self.local_data.update(self._discipline.get_output_data()) - # else: - # if n < self.n_retry: - # self.local_data.update(self.process_results) - # else: - # msg = "Error, timeout was reached at each try." - # raise DisciplineTimeOutError(msg) def _execute_discipline(self, input_data): """Run the discipline. @@ -143,13 +124,13 @@ class RetryDiscipline(Discipline): LOGGER.debug("Running discipline and get the output") # wait for result with timeout - self.process_results = run_discipline.result(timeout=self.disc_timeout) + process_results = run_discipline.result(timeout=self.disc_timeout) # report the result LOGGER.debug("Results:") - LOGGER.debug(self.process_results) + LOGGER.debug(process_results) - self._discipline.local_data.update(self.process_results) + self._discipline.local_data.update(process_results) LOGGER.debug("Discipline outputs:") data_outputs = self._discipline.get_output_data() for key in data_outputs: @@ -158,7 +139,7 @@ class RetryDiscipline(Discipline): LOGGER.debug("Elapsed time from starting 01: %s", timer() - t_0) - except concurrent.futures.TimeoutError: + except TimeoutError: """Runtime exceeds timeout, killing process and its children.""" LOGGER.debug("TimeoutError: Time out waiting for result()") @@ -178,10 +159,10 @@ class RetryDiscipline(Discipline): ) err_msg = ( - "Process stopped as it exceeds timeout (%s s)", - self.disc_timeout, + f"Process stopped as it exceeds timeout ({self.disc_timeout!s} s)" ) - raise DisciplineTimeOutError(err_msg) # noqa: B904 + LOGGER.info(err_msg) + raise TimeoutError # noqa: B904 except Exception as err: # noqa: BLE001 LOGGER.debug(type(err)) @@ -197,7 +178,7 @@ class RetryDiscipline(Discipline): elapsed_time = t_f - t_0 LOGGER.debug("Elapsed time in discipline: %s", elapsed_time) - LOGGER.debug("Results: %s", self.process_results) + LOGGER.debug("Results: %s", self._discipline.get_output_data()) LOGGER.debug("\n****** Exiting _execute_discipline *****:") def _disc_execution(self, input_data): diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 43ab4873b4..9857af8b2f 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -21,14 +21,13 @@ from __future__ import annotations +import concurrent.futures as cfutures from logging import getLogger import pytest from numpy import array from gemseo import create_discipline -from gemseo.core.discipline import Discipline -from gemseo.disciplines.wrappers.retry_discipline import DisciplineTimeOutError from gemseo.disciplines.wrappers.retry_discipline import RetryDiscipline LOGGER = getLogger(__name__) @@ -44,38 +43,6 @@ def a_crashing_analytic_discipline(): return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) -class FailingDiscipline(Discipline): - """A failing discipline, no more used""" - - default_grammar_type = Discipline.GrammarType.SIMPLE - - def __init__(self, raise_exception=RuntimeError, stop_raising_after_n_executions=3): - super().__init__(name="failing") - self.input_grammar.update_from_names(["input"]) - self.output_grammar.update_from_names(["output"]) - self._raise_exception = raise_exception - self._stop_raising_after_n_executions = stop_raising_after_n_executions - - self._n_executions = 0 - - def _run(self): - self._n_executions += 1 - if self._n_executions < self._stop_raising_after_n_executions: - raise self._raise_exception() - - self.local_data["output"] = self.local_data["input"] - - -def currently_deactivated_test_basic_failure(): - """Test that the default inputs are independent.""" - discipline = FailingDiscipline() - retry = RetryDiscipline(discipline) - - retry.execute({"input": array([1])}) - - assert discipline._n_executions == 3 - - def test_retry_discipline(): """Test discipline, no timeout set.""" analytic_disc = an_analytic_discipline() @@ -107,17 +74,18 @@ def test_failure_retry_discipline_with_timeout(): """Test failure of the discipline with a too much very short timeout.""" analytic_disc = an_analytic_discipline() timeout = 0.0001 + n_retry = 1 - disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=timeout) + disc_with_timeout = RetryDiscipline( + analytic_disc, disc_timeout=timeout, n_retry=n_retry + ) inputs = {"x": array([4.0])} - with pytest.raises(DisciplineTimeOutError) as timeout_exec: + with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute(inputs) - - assert timeout_exec.typename == "DisciplineTimeOutError" - assert str(timeout_exec.value) == "Error, timeout was reached at each try." - assert disc_with_timeout.n_executions == disc_with_timeout.n_retry + assert timeout_exec.typename == "TimeoutError" + assert disc_with_timeout.n_executions == n_retry def test_failure_zero_division_error(): -- GitLab From 0fc90f5fd94ff8b00ade8a72ddbdbc1257e3053d Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 19 Nov 2024 16:57:34 +0100 Subject: [PATCH 07/58] fix: increase timeout for test_retry_discipline_with_timeout --- tests/disciplines/wrappers/test_retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 9857af8b2f..eda1bf23b4 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -59,7 +59,7 @@ def test_retry_discipline(): def test_retry_discipline_with_timeout(): """Test discipline with a timeout.""" analytic_disc = an_analytic_discipline() - timeout = 5 + timeout = 10 disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=timeout) -- GitLab From 326efbc6ded2d2fa85e49d4589425a4e7b23932d Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 20 Nov 2024 16:16:15 +0100 Subject: [PATCH 08/58] feat: re-introduction of abort_exceptions feature --- .../disciplines/wrappers/retry_discipline.py | 75 ++++++++----------- 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index ae2aeefc87..901370959d 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -16,15 +16,19 @@ from __future__ import annotations +import concurrent.futures as cfutures import os import signal import time from concurrent.futures import ProcessPoolExecutor from logging import getLogger -from timeit import default_timer as timer +from typing import TYPE_CHECKING from gemseo.core.discipline import Discipline +if TYPE_CHECKING: + from collections.abc import Sequence + LOGGER = getLogger(__name__) @@ -40,20 +44,27 @@ class RetryDiscipline(Discipline): n_retry: int = 5, wait_time: float = 1, disc_timeout: float | None = None, + abort_exceptions: Sequence[Exception] | None = None, ) -> None: - """ + """Initialisation of RetryDiscipline discipline. + Args: - discipline: The discipline to wrap in the retry loop + discipline: The discipline to wrap in the retry loop. - n_retry: max number of retry + n_retry: The maximum number of retry of 'discipline'. - wait_time: time to wait before 2 retries (in seconds) + wait_time: The time to wait between 2 retries (in seconds). - disc_timeout : Time limit in seconds the discipline can run, when exceeded - the execution is aborted. + disc_timeout : The time limit in seconds the discipline can run, when + exceeded the execution is aborted. + + abort_exceptions: Tuple of exceptions for which the code raises an + exception and exit immediately without retrying a run. Raises: TimeoutError: if disc_timeout limit is reached. + Other exceptions if issue encountered during the execution of 'discipline'. + """ # noqa:D205 D212 D415 super().__init__(discipline.name) self._discipline = discipline @@ -63,6 +74,7 @@ class RetryDiscipline(Discipline): self.n_retry = n_retry self._wait_time = wait_time self.disc_timeout = disc_timeout + self._abort_exceptions = abort_exceptions self.n_executions = 0 """Number of performed executions of the discipline.""" @@ -79,14 +91,23 @@ class RetryDiscipline(Discipline): try: self._execute_discipline(input_data) - except TimeoutError as time_out_error: + except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: """Try to re-launch the discipline as timeout is reached.""" current_error = time_out_error time.sleep(self._wait_time) + stop = False continue - except Exception as err: # noqa: BLE001 - # LOGGER.exception(msg) + if self._abort_exceptions is not None and isinstance( + err, self._abort_exceptions + ): + msg = ( + f"Failed to execute discipline {self._discipline.name}, " + f"aborting retry because of the exception type {type(err)!s}." + ) + LOGGER.info(msg) + raise + current_error = err stop = False if stop: @@ -110,36 +131,21 @@ class RetryDiscipline(Discipline): """ LOGGER.debug("time_out set to : %s s", self.disc_timeout) - t_0 = timer() - with ProcessPoolExecutor() as executor: run_discipline = executor.submit( self._disc_execution, input_data, ) - LOGGER.debug("Elapsed time from starting 00: %s", timer() - t_0) - try: LOGGER.debug("Running discipline and get the output") # wait for result with timeout process_results = run_discipline.result(timeout=self.disc_timeout) - # report the result - LOGGER.debug("Results:") - LOGGER.debug(process_results) - self._discipline.local_data.update(process_results) - LOGGER.debug("Discipline outputs:") - data_outputs = self._discipline.get_output_data() - for key in data_outputs: - val = data_outputs[key] - LOGGER.debug(str(f" {key} ({type(val)}): {val}")) - - LOGGER.debug("Elapsed time from starting 01: %s", timer() - t_0) - except TimeoutError: + except (cfutures._base.TimeoutError, TimeoutError): """Runtime exceeds timeout, killing process and its children.""" LOGGER.debug("TimeoutError: Time out waiting for result()") @@ -154,31 +160,16 @@ class RetryDiscipline(Discipline): for pid in pid_child: os.kill(pid, signal.SIGTERM) - LOGGER.debug( - "Elapsed time from starting (TimeoutError): %s", timer() - t_0 - ) - err_msg = ( f"Process stopped as it exceeds timeout ({self.disc_timeout!s} s)" ) LOGGER.info(err_msg) - raise TimeoutError # noqa: B904 + raise except Exception as err: # noqa: BLE001 LOGGER.debug(type(err)) - LOGGER.debug(err) - - LOGGER.debug("issue in getting output") - LOGGER.debug( - "Elapsed time from starting (Other Error): %s", timer() - t_0 - ) raise - t_f = timer() - elapsed_time = t_f - t_0 - LOGGER.debug("Elapsed time in discipline: %s", elapsed_time) - - LOGGER.debug("Results: %s", self._discipline.get_output_data()) LOGGER.debug("\n****** Exiting _execute_discipline *****:") def _disc_execution(self, input_data): -- GitLab From db5891fdbc4c8804142a8f068abd63e4cbaaeb5b Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 20 Nov 2024 17:11:33 +0100 Subject: [PATCH 09/58] test: add new test for abort_exceptions feature --- .../wrappers/test_retry_discipline.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index eda1bf23b4..4d201380d2 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -23,13 +23,18 @@ from __future__ import annotations import concurrent.futures as cfutures from logging import getLogger +from typing import TYPE_CHECKING import pytest from numpy import array from gemseo import create_discipline +from gemseo.core.discipline import Discipline from gemseo.disciplines.wrappers.retry_discipline import RetryDiscipline +if TYPE_CHECKING: + from gemseo.typing import StrKeyMapping + LOGGER = getLogger(__name__) @@ -43,6 +48,19 @@ def a_crashing_analytic_discipline(): return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) +class CrashingDisciplineInRun(Discipline): + """A discipline rising NotImplementedError in _run.""" + + def __init__(self, name: str = "", a_string=None) -> None: + """A discipline rising NotImplementedError in _run.""" + super().__init__(name=name) + self.a_string = a_string + + def _run(self, input_data: StrKeyMapping): + """The run method.""" + raise NotImplementedError + + def test_retry_discipline(): """Test discipline, no timeout set.""" analytic_disc = an_analytic_discipline() @@ -119,3 +137,61 @@ def test_failure_zero_division_error_with_timeout(n_try): assert err.type is ZeroDivisionError assert err.typename == "ZeroDivisionError" assert str(err.value) == "float division by zero" + + +def test_retry_discipline_with_abort_exceptions(): + """Test discipline with a_crashing_analytic_discipline and a tuple of + + a tuple of abort_exceptions that abort the retry (ZeroDivisionError). + """ + n_retry = 5 + disc_timeout = 100.0 + abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) + analytic_disc = a_crashing_analytic_discipline() + retry_discipline = RetryDiscipline( + analytic_disc, + n_retry=n_retry, + disc_timeout=disc_timeout, + abort_exceptions=abort_exceptions, + ) + inputs = {"x": array([0.0])} + + with pytest.raises(ZeroDivisionError) as err: + retry_discipline.execute(inputs) + + assert retry_discipline.n_executions == 1 + assert err.type is ZeroDivisionError + assert err.typename == "ZeroDivisionError" + assert str(err.value) == "float division by zero" + + +def test_a_not_implemented_error_analytic_discipline(): + """Test discipline with a_crashing_discipline_in_run and a tuple of + + a tuple of abort_exceptions that abort the retry (ZeroDivisionError). + """ + n_retry = 5 + disc_timeout = 100.0 + abort_exceptions = ( + ZeroDivisionError, + FloatingPointError, + OverflowError, + NotImplementedError, + ) + + analytic_disc = CrashingDisciplineInRun(name="Crash_run", a_string="Hello") + retry_discipline = RetryDiscipline( + analytic_disc, + n_retry=n_retry, + disc_timeout=disc_timeout, + abort_exceptions=abort_exceptions, + ) + + inputs = {"x": array([1.0])} + + with pytest.raises(NotImplementedError) as err: + retry_discipline.execute(inputs) + + assert retry_discipline.n_executions == 1 + assert err.type is NotImplementedError + assert err.typename == "NotImplementedError" -- GitLab From 119f2daa775296f32d182bdb62ee63e0ef1172a9 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 21 Nov 2024 14:02:26 +0100 Subject: [PATCH 10/58] test: add new test to complete the coverage --- src/gemseo/disciplines/wrappers/retry_discipline.py | 3 +-- tests/disciplines/wrappers/test_retry_discipline.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 901370959d..b140f3f633 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -174,5 +174,4 @@ class RetryDiscipline(Discipline): def _disc_execution(self, input_data): """Execute discipline.""" - self._discipline.execute(input_data) - return self._discipline.get_output_data() + return self._discipline.execute(input_data) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 4d201380d2..194fff4fd0 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -195,3 +195,14 @@ def test_a_not_implemented_error_analytic_discipline(): assert retry_discipline.n_executions == 1 assert err.type is NotImplementedError assert err.typename == "NotImplementedError" + + +def test_disc_execution(): + """Test the _disc_execution method.""" + analytic_disc = an_analytic_discipline() + disc = RetryDiscipline(analytic_disc) + + inputs = {"x": array([4.0])} + results = disc._disc_execution(inputs) + + assert results == {"x": array([4.0]), "y": array([9.0])} -- GitLab From e32f88d9b05128bc8527e3a399fd1510a82f49fd Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 18 Dec 2024 15:42:04 +0100 Subject: [PATCH 11/58] test: add new test for timeout feature --- .../disciplines/wrappers/retry_discipline.py | 9 +++- .../wrappers/test_retry_discipline.py | 49 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index b140f3f633..749a39d57a 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -90,7 +90,14 @@ class RetryDiscipline(Discipline): self.n_executions += 1 try: - self._execute_discipline(input_data) + """If no timeout set, normal execution inside the "retry" loop + else enclose the discipline in ProcessPoolExecutor to run it. + """ + if self.disc_timeout is None: + process_results = self._discipline.execute(input_data) + self._discipline.local_data.update(process_results) + else: + self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: """Try to re-launch the discipline as timeout is reached.""" current_error = time_out_error diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 194fff4fd0..02d8602ee6 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -22,6 +22,7 @@ from __future__ import annotations import concurrent.futures as cfutures +import math from logging import getLogger from typing import TYPE_CHECKING @@ -61,6 +62,23 @@ class CrashingDisciplineInRun(Discipline): raise NotImplementedError +class DisciplineLongTimeRunning(Discipline): + """A discipline that could run for a while, to test the timeout feature.""" + + def __init__(self, name: str = "", a_string=None) -> None: + """A discipline that could run for a while.""" + super().__init__(name=name) + self.a_string = a_string + + def _run(self, input_data: StrKeyMapping): + """The run method.""" + x = 0.0 + for i in range(300): + for j in range(1000): + for _k in range(1000): + x = x + math.cos(i) * math.sin(j) + + def test_retry_discipline(): """Test discipline, no timeout set.""" analytic_disc = an_analytic_discipline() @@ -102,6 +120,7 @@ def test_failure_retry_discipline_with_timeout(): with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute(inputs) + assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry @@ -126,14 +145,22 @@ def test_failure_zero_division_error_with_timeout(n_try): In order to catch the ZeroDivisionError that arises before timeout (5s), test with n_retry=1 and 3 to be sure every case is ok. """ + + abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) + analytic_disc = a_crashing_analytic_discipline() - disc = RetryDiscipline(analytic_disc, n_retry=n_try, disc_timeout=5.0) + disc = RetryDiscipline( + analytic_disc, + n_retry=n_try, + disc_timeout=5.0, + abort_exceptions=abort_exceptions, + ) inputs = {"x": array([0.0])} with pytest.raises(ZeroDivisionError) as err: disc.execute(inputs) - assert disc.n_executions == disc.n_retry + assert disc.n_executions == 1 assert err.type is ZeroDivisionError assert err.typename == "ZeroDivisionError" assert str(err.value) == "float division by zero" @@ -197,6 +224,24 @@ def test_a_not_implemented_error_analytic_discipline(): assert err.typename == "NotImplementedError" +def test_retry_discipline_timeout_feature(): + """Test the timeout feature of discipline with a long computation.""" + disc = DisciplineLongTimeRunning() + timeout = 3.0 + n_retry = 1 + + disc_with_timeout = RetryDiscipline(disc, disc_timeout=timeout, n_retry=n_retry) + + inputs = {"x": array([0.0])} + + with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: + disc_with_timeout.execute(inputs) + + assert timeout_exec.typename == "TimeoutError" + assert disc_with_timeout.n_executions == n_retry + assert disc_with_timeout.local_data == {} + + def test_disc_execution(): """Test the _disc_execution method.""" analytic_disc = an_analytic_discipline() -- GitLab From b06557ffc0a6d3cdd962da503d9a3442669ef8fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 8 Jan 2025 15:59:28 +0000 Subject: [PATCH 12/58] refactor + docs: light modifications after code review --- .../disciplines/wrappers/retry_discipline.py | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 749a39d57a..2d69985f77 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -12,7 +12,12 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -"""Retry discipline execution, with a possible timeout passed to the discipline.""" +"""The retry discipline module. + +A RetryDiscipline allows a discipline to be executed multiple times (up to a specified number of retries) +if the previous attempts fail to produce any result. +For example, a timeout can be specified to prevent executions from becoming stuck. +""" from __future__ import annotations @@ -33,9 +38,12 @@ LOGGER = getLogger(__name__) class RetryDiscipline(Discipline): - """A discipline that retries execution. + """A discipline to be executed with a retry. + +It can be executed multiple times (up to a specified number of retries) +if the previous attempts fail to produce any result. +For example, a timeout can be specified to prevent executions from becoming stuck. - with a possible timeout to stop the execution. """ def __init__( @@ -46,19 +54,14 @@ class RetryDiscipline(Discipline): disc_timeout: float | None = None, abort_exceptions: Sequence[Exception] | None = None, ) -> None: - """Initialisation of RetryDiscipline discipline. - + """ Args: discipline: The discipline to wrap in the retry loop. - - n_retry: The maximum number of retry of 'discipline'. - + n_retry: The maximum number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). - - disc_timeout : The time limit in seconds the discipline can run, when - exceeded the execution is aborted. - - abort_exceptions: Tuple of exceptions for which the code raises an + disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. + If this time limit is exceeded, the execution is terminated. + abort_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. Raises: -- GitLab From b7d2d98f08a380d4ac82464967e0d762a88dff2f Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 8 Jan 2025 17:19:52 +0100 Subject: [PATCH 13/58] docs: modifications after code review --- .../disciplines/wrappers/retry_discipline.py | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 2d69985f77..4763bd57cc 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -14,8 +14,9 @@ # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. """The retry discipline module. -A RetryDiscipline allows a discipline to be executed multiple times (up to a specified number of retries) -if the previous attempts fail to produce any result. +A RetryDiscipline allows a discipline to be executed multiple times (up to a +specified number of retries) if the previous attempts fail to produce any result. + For example, a timeout can be specified to prevent executions from becoming stuck. """ @@ -40,12 +41,15 @@ LOGGER = getLogger(__name__) class RetryDiscipline(Discipline): """A discipline to be executed with a retry. -It can be executed multiple times (up to a specified number of retries) -if the previous attempts fail to produce any result. -For example, a timeout can be specified to prevent executions from becoming stuck. + It can be executed multiple times (up to a specified number of retries) + if the previous attempts fail to produce any result. + For example, a timeout can be specified to prevent executions from becoming stuck. """ + n_executions = 0 + """Number of performed executions of the discipline.""" + def __init__( self, discipline: Discipline, @@ -59,8 +63,9 @@ For example, a timeout can be specified to prevent executions from becoming stuc discipline: The discipline to wrap in the retry loop. n_retry: The maximum number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). - disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. - If this time limit is exceeded, the execution is terminated. + disc_timeout : The maximum duration, in seconds, that the discipline is + allowed to run. If this time limit is exceeded, the + execution is terminated. abort_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. @@ -79,11 +84,7 @@ For example, a timeout can be specified to prevent executions from becoming stuc self.disc_timeout = disc_timeout self._abort_exceptions = abort_exceptions - self.n_executions = 0 - """Number of performed executions of the discipline.""" - def _run(self, input_data=None) -> None: - """Method called by 'execute'.""" current_error = None self.n_executions = 0 @@ -102,7 +103,7 @@ For example, a timeout can be specified to prevent executions from becoming stuc else: self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: - """Try to re-launch the discipline as timeout is reached.""" + LOGGER.debug("Try to re-launch the discipline as timeout is reached.") current_error = time_out_error time.sleep(self._wait_time) stop = False @@ -131,11 +132,10 @@ For example, a timeout can be specified to prevent executions from becoming stuc ) raise current_error - # Transfer output data in RetryDiscipline local data. self.local_data.update(self._discipline.get_output_data()) def _execute_discipline(self, input_data): - """Run the discipline. + """Execute the discipline. disc_timeout is set to n seconds, or None """ @@ -150,9 +150,7 @@ For example, a timeout can be specified to prevent executions from becoming stuc try: LOGGER.debug("Running discipline and get the output") - # wait for result with timeout process_results = run_discipline.result(timeout=self.disc_timeout) - self._discipline.local_data.update(process_results) except (cfutures._base.TimeoutError, TimeoutError): -- GitLab From 122f48ccc227b1304a3bc063b55a53e396e945e6 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 9 Jan 2025 11:47:01 +0100 Subject: [PATCH 14/58] refactor: fix typing for some methods and variable _abort_exceptions --- .../disciplines/wrappers/retry_discipline.py | 25 +++++++------- .../wrappers/test_retry_discipline.py | 33 +++++++------------ 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 4763bd57cc..bebae674a2 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -33,7 +33,9 @@ from typing import TYPE_CHECKING from gemseo.core.discipline import Discipline if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Iterable + + from gemseo.typing import StrKeyMapping LOGGER = getLogger(__name__) @@ -56,7 +58,7 @@ class RetryDiscipline(Discipline): n_retry: int = 5, wait_time: float = 1, disc_timeout: float | None = None, - abort_exceptions: Sequence[Exception] | None = None, + abort_exceptions: Iterable[type[Exception]] = (), ) -> None: """ Args: @@ -84,7 +86,7 @@ class RetryDiscipline(Discipline): self.disc_timeout = disc_timeout self._abort_exceptions = abort_exceptions - def _run(self, input_data=None) -> None: + def _run(self, input_data: StrKeyMapping | None = None) -> StrKeyMapping | None: current_error = None self.n_executions = 0 @@ -109,9 +111,7 @@ class RetryDiscipline(Discipline): stop = False continue except Exception as err: # noqa: BLE001 - if self._abort_exceptions is not None and isinstance( - err, self._abort_exceptions - ): + if type(err) in self._abort_exceptions: msg = ( f"Failed to execute discipline {self._discipline.name}, " f"aborting retry because of the exception type {type(err)!s}." @@ -132,9 +132,10 @@ class RetryDiscipline(Discipline): ) raise current_error - self.local_data.update(self._discipline.get_output_data()) + LOGGER.debug("Transfer output data in RetryDiscipline local data.") + return self._discipline.get_output_data() - def _execute_discipline(self, input_data): + def _execute_discipline(self, input_data: StrKeyMapping) -> None: """Execute the discipline. disc_timeout is set to n seconds, or None @@ -143,7 +144,7 @@ class RetryDiscipline(Discipline): with ProcessPoolExecutor() as executor: run_discipline = executor.submit( - self._disc_execution, + self._discipline.execute, input_data, ) @@ -180,6 +181,6 @@ class RetryDiscipline(Discipline): LOGGER.debug("\n****** Exiting _execute_discipline *****:") - def _disc_execution(self, input_data): - """Execute discipline.""" - return self._discipline.execute(input_data) + # def _disc_execution(self, input_data: StrKeyMapping) -> DisciplineData: + # """Execute discipline.""" + # return self._discipline.execute(input_data) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 02d8602ee6..24c87975fc 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -39,12 +39,12 @@ if TYPE_CHECKING: LOGGER = getLogger(__name__) -def an_analytic_discipline(): +def an_analytic_discipline() -> Discipline: """Analytic discipline.""" return create_discipline("AnalyticDiscipline", expressions={"y": "2*x+1"}) -def a_crashing_analytic_discipline(): +def a_crashing_analytic_discipline() -> Discipline: """Analytic discipline crashing when x=0.""" return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) @@ -70,7 +70,7 @@ class DisciplineLongTimeRunning(Discipline): super().__init__(name=name) self.a_string = a_string - def _run(self, input_data: StrKeyMapping): + def _run(self, input_data: StrKeyMapping) -> None: """The run method.""" x = 0.0 for i in range(300): @@ -79,7 +79,7 @@ class DisciplineLongTimeRunning(Discipline): x = x + math.cos(i) * math.sin(j) -def test_retry_discipline(): +def test_retry_discipline() -> None: """Test discipline, no timeout set.""" analytic_disc = an_analytic_discipline() @@ -92,7 +92,7 @@ def test_retry_discipline(): assert retry_discipline.n_executions == 1 -def test_retry_discipline_with_timeout(): +def test_retry_discipline_with_timeout() -> None: """Test discipline with a timeout.""" analytic_disc = an_analytic_discipline() timeout = 10 @@ -106,7 +106,7 @@ def test_retry_discipline_with_timeout(): assert disc_with_timeout.n_executions == 1 -def test_failure_retry_discipline_with_timeout(): +def test_failure_retry_discipline_with_timeout() -> None: """Test failure of the discipline with a too much very short timeout.""" analytic_disc = an_analytic_discipline() timeout = 0.0001 @@ -125,7 +125,7 @@ def test_failure_retry_discipline_with_timeout(): assert disc_with_timeout.n_executions == n_retry -def test_failure_zero_division_error(): +def test_failure_zero_division_error() -> None: """Test failure of the discipline with a bad x entry. In order to catch the ZeroDivisionError, set n_retry=1 @@ -139,7 +139,7 @@ def test_failure_zero_division_error(): @pytest.mark.parametrize("n_try", [1, 3]) -def test_failure_zero_division_error_with_timeout(n_try): +def test_failure_zero_division_error_with_timeout(n_try: int) -> None: """Test failure of the discipline with timeout and a bad x entry. In order to catch the ZeroDivisionError that arises before timeout (5s), test with @@ -166,7 +166,7 @@ def test_failure_zero_division_error_with_timeout(n_try): assert str(err.value) == "float division by zero" -def test_retry_discipline_with_abort_exceptions(): +def test_retry_discipline_with_abort_exceptions() -> None: """Test discipline with a_crashing_analytic_discipline and a tuple of a tuple of abort_exceptions that abort the retry (ZeroDivisionError). @@ -192,7 +192,7 @@ def test_retry_discipline_with_abort_exceptions(): assert str(err.value) == "float division by zero" -def test_a_not_implemented_error_analytic_discipline(): +def test_a_not_implemented_error_analytic_discipline() -> None: """Test discipline with a_crashing_discipline_in_run and a tuple of a tuple of abort_exceptions that abort the retry (ZeroDivisionError). @@ -224,7 +224,7 @@ def test_a_not_implemented_error_analytic_discipline(): assert err.typename == "NotImplementedError" -def test_retry_discipline_timeout_feature(): +def test_retry_discipline_timeout_feature() -> None: """Test the timeout feature of discipline with a long computation.""" disc = DisciplineLongTimeRunning() timeout = 3.0 @@ -240,14 +240,3 @@ def test_retry_discipline_timeout_feature(): assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry assert disc_with_timeout.local_data == {} - - -def test_disc_execution(): - """Test the _disc_execution method.""" - analytic_disc = an_analytic_discipline() - disc = RetryDiscipline(analytic_disc) - - inputs = {"x": array([4.0])} - results = disc._disc_execution(inputs) - - assert results == {"x": array([4.0]), "y": array([9.0])} -- GitLab From f3bfd738f285b09b39d881b2513c1a0346a7f540 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 9 Jan 2025 15:30:52 +0100 Subject: [PATCH 15/58] refactor: remove method _disc_execution --- src/gemseo/disciplines/wrappers/retry_discipline.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index bebae674a2..aad233e6d9 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -180,7 +180,3 @@ class RetryDiscipline(Discipline): raise LOGGER.debug("\n****** Exiting _execute_discipline *****:") - - # def _disc_execution(self, input_data: StrKeyMapping) -> DisciplineData: - # """Execute discipline.""" - # return self._discipline.execute(input_data) -- GitLab From a8c9065a999c7bb1b3832a0487fb3802a4cdf80d Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 14 Jan 2025 16:20:40 +0100 Subject: [PATCH 16/58] refactor: modify some variable names and test --- changelog/fragments/927.added.rst | 1 + .../disciplines/wrappers/retry_discipline.py | 52 +++++++------ .../wrappers/test_retry_discipline.py | 77 ++++++++----------- 3 files changed, 63 insertions(+), 67 deletions(-) create mode 100644 changelog/fragments/927.added.rst diff --git a/changelog/fragments/927.added.rst b/changelog/fragments/927.added.rst new file mode 100644 index 0000000000..e1649f26a6 --- /dev/null +++ b/changelog/fragments/927.added.rst @@ -0,0 +1 @@ +- New discipline RetryDiscipline : Wraps a discipline to retry the execution several times. Try to execute the discipline, if it raises an exception retries up to a maximum number of attempts. Can pass a tuple of exceptions that, if one of them raised, do not retry the execution. diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index aad233e6d9..d34388a09e 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -41,21 +41,27 @@ LOGGER = getLogger(__name__) class RetryDiscipline(Discipline): - """A discipline to be executed with a retry. + """A discipline to be executed with retry and timeout options. + + This :class:`.Discipline` wraps another discipline. It can be executed multiple times (up to a specified number of retries) if the previous attempts fail to produce any result. - For example, a timeout can be specified to prevent executions from becoming stuck. + + A timeout in seconds can be specified to prevent executions from becoming stuck. + + User can also provide a tuple of :class:`.Exceptions that, if one of them raised, + it does not retry the execution. """ - n_executions = 0 + n_executions: int """Number of performed executions of the discipline.""" def __init__( self, discipline: Discipline, - n_retry: int = 5, + n_max_retry: int = 5, wait_time: float = 1, disc_timeout: float | None = None, abort_exceptions: Iterable[type[Exception]] = (), @@ -63,7 +69,7 @@ class RetryDiscipline(Discipline): """ Args: discipline: The discipline to wrap in the retry loop. - n_retry: The maximum number of retry of the discipline. + n_max_retry: The maximum number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the @@ -77,21 +83,21 @@ class RetryDiscipline(Discipline): """ # noqa:D205 D212 D415 super().__init__(discipline.name) - self._discipline = discipline + self.__discipline = discipline self.input_grammar = discipline.input_grammar self.output_grammar = discipline.output_grammar self.default_input_data = discipline.default_input_data - self.n_retry = n_retry - self._wait_time = wait_time + self.n_max_retry = n_max_retry + self.wait_time = wait_time self.disc_timeout = disc_timeout - self._abort_exceptions = abort_exceptions + self.abort_exceptions = abort_exceptions def _run(self, input_data: StrKeyMapping | None = None) -> StrKeyMapping | None: current_error = None self.n_executions = 0 stop = True - for _n in range(1, self.n_retry + 1): + for _n in range(1, self.n_max_retry + 1): stop = True self.n_executions += 1 @@ -100,20 +106,20 @@ class RetryDiscipline(Discipline): else enclose the discipline in ProcessPoolExecutor to run it. """ if self.disc_timeout is None: - process_results = self._discipline.execute(input_data) - self._discipline.local_data.update(process_results) + process_results = self.__discipline.execute(input_data) + self.__discipline.local_data.update(process_results) else: self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: LOGGER.debug("Try to re-launch the discipline as timeout is reached.") current_error = time_out_error - time.sleep(self._wait_time) + time.sleep(self.wait_time) stop = False continue except Exception as err: # noqa: BLE001 - if type(err) in self._abort_exceptions: + if type(err) in self.abort_exceptions: msg = ( - f"Failed to execute discipline {self._discipline.name}, " + f"Failed to execute discipline {self.__discipline.name}, " f"aborting retry because of the exception type {type(err)!s}." ) LOGGER.info(msg) @@ -123,17 +129,17 @@ class RetryDiscipline(Discipline): stop = False if stop: break - time.sleep(self._wait_time) + time.sleep(self.wait_time) if not stop and current_error is not None: - LOGGER.error( - "Failed to execute discipline %s after %s attempts.", - self._discipline.name, - self.n_retry, + msg = ( + f"Failed to execute discipline {self.__discipline.name} " + f"after {self.n_max_retry} attempts.", ) + LOGGER.error(msg) raise current_error LOGGER.debug("Transfer output data in RetryDiscipline local data.") - return self._discipline.get_output_data() + return self.__discipline.get_output_data() def _execute_discipline(self, input_data: StrKeyMapping) -> None: """Execute the discipline. @@ -144,7 +150,7 @@ class RetryDiscipline(Discipline): with ProcessPoolExecutor() as executor: run_discipline = executor.submit( - self._discipline.execute, + self.__discipline.execute, input_data, ) @@ -152,7 +158,7 @@ class RetryDiscipline(Discipline): LOGGER.debug("Running discipline and get the output") process_results = run_discipline.result(timeout=self.disc_timeout) - self._discipline.local_data.update(process_results) + self.__discipline.local_data.update(process_results) except (cfutures._base.TimeoutError, TimeoutError): """Runtime exceeds timeout, killing process and its children.""" diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 24c87975fc..8c31e547fb 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -39,18 +39,20 @@ if TYPE_CHECKING: LOGGER = getLogger(__name__) +@pytest.fixture def an_analytic_discipline() -> Discipline: """Analytic discipline.""" return create_discipline("AnalyticDiscipline", expressions={"y": "2*x+1"}) +@pytest.fixture def a_crashing_analytic_discipline() -> Discipline: """Analytic discipline crashing when x=0.""" return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) class CrashingDisciplineInRun(Discipline): - """A discipline rising NotImplementedError in _run.""" + """A discipline rising NotImplementedError in ``_run``.""" def __init__(self, name: str = "", a_string=None) -> None: """A discipline rising NotImplementedError in _run.""" @@ -79,9 +81,9 @@ class DisciplineLongTimeRunning(Discipline): x = x + math.cos(i) * math.sin(j) -def test_retry_discipline() -> None: +def test_retry_discipline(an_analytic_discipline) -> None: """Test discipline, no timeout set.""" - analytic_disc = an_analytic_discipline() + analytic_disc = an_analytic_discipline retry_discipline = RetryDiscipline(analytic_disc) @@ -92,28 +94,25 @@ def test_retry_discipline() -> None: assert retry_discipline.n_executions == 1 -def test_retry_discipline_with_timeout() -> None: +def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: """Test discipline with a timeout.""" - analytic_disc = an_analytic_discipline() - timeout = 10 + analytic_disc = an_analytic_discipline - disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=timeout) + disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=10) - inputs = {"x": array([4.0])} - disc_with_timeout.execute(inputs) + disc_with_timeout.execute({"x": array([4.0])}) assert disc_with_timeout.local_data == {"x": array([4.0]), "y": array([9.0])} assert disc_with_timeout.n_executions == 1 -def test_failure_retry_discipline_with_timeout() -> None: +@pytest.mark.parametrize("n_retry", [1, 2, 3]) +def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) -> None: """Test failure of the discipline with a too much very short timeout.""" - analytic_disc = an_analytic_discipline() - timeout = 0.0001 - n_retry = 1 + analytic_disc = an_analytic_discipline disc_with_timeout = RetryDiscipline( - analytic_disc, disc_timeout=timeout, n_retry=n_retry + analytic_disc, disc_timeout=1e-4, n_max_retry=n_retry ) inputs = {"x": array([4.0])} @@ -125,21 +124,22 @@ def test_failure_retry_discipline_with_timeout() -> None: assert disc_with_timeout.n_executions == n_retry -def test_failure_zero_division_error() -> None: +def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: """Test failure of the discipline with a bad x entry. In order to catch the ZeroDivisionError, set n_retry=1 """ - analytic_disc = a_crashing_analytic_discipline() - disc = RetryDiscipline(analytic_disc, n_retry=1) + analytic_disc = a_crashing_analytic_discipline + disc = RetryDiscipline(analytic_disc, n_max_retry=1) - inputs = {"x": array([0.0])} with pytest.raises(ZeroDivisionError): - disc.execute(inputs) + disc.execute({"x": array([0.0])}) @pytest.mark.parametrize("n_try", [1, 3]) -def test_failure_zero_division_error_with_timeout(n_try: int) -> None: +def test_failure_zero_division_error_with_timeout( + a_crashing_analytic_discipline, n_try: int +) -> None: """Test failure of the discipline with timeout and a bad x entry. In order to catch the ZeroDivisionError that arises before timeout (5s), test with @@ -148,17 +148,16 @@ def test_failure_zero_division_error_with_timeout(n_try: int) -> None: abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) - analytic_disc = a_crashing_analytic_discipline() + analytic_disc = a_crashing_analytic_discipline disc = RetryDiscipline( analytic_disc, - n_retry=n_try, + n_max_retry=n_try, disc_timeout=5.0, abort_exceptions=abort_exceptions, ) - inputs = {"x": array([0.0])} with pytest.raises(ZeroDivisionError) as err: - disc.execute(inputs) + disc.execute({"x": array([0.0])}) assert disc.n_executions == 1 assert err.type is ZeroDivisionError @@ -166,25 +165,22 @@ def test_failure_zero_division_error_with_timeout(n_try: int) -> None: assert str(err.value) == "float division by zero" -def test_retry_discipline_with_abort_exceptions() -> None: +def test_retry_discipline_with_abort_exceptions(a_crashing_analytic_discipline) -> None: """Test discipline with a_crashing_analytic_discipline and a tuple of a tuple of abort_exceptions that abort the retry (ZeroDivisionError). """ - n_retry = 5 - disc_timeout = 100.0 abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) - analytic_disc = a_crashing_analytic_discipline() + analytic_disc = a_crashing_analytic_discipline retry_discipline = RetryDiscipline( analytic_disc, - n_retry=n_retry, - disc_timeout=disc_timeout, + n_max_retry=5, + disc_timeout=100.0, abort_exceptions=abort_exceptions, ) - inputs = {"x": array([0.0])} with pytest.raises(ZeroDivisionError) as err: - retry_discipline.execute(inputs) + retry_discipline.execute({"x": array([0.0])}) assert retry_discipline.n_executions == 1 assert err.type is ZeroDivisionError @@ -197,8 +193,6 @@ def test_a_not_implemented_error_analytic_discipline() -> None: a tuple of abort_exceptions that abort the retry (ZeroDivisionError). """ - n_retry = 5 - disc_timeout = 100.0 abort_exceptions = ( ZeroDivisionError, FloatingPointError, @@ -209,15 +203,13 @@ def test_a_not_implemented_error_analytic_discipline() -> None: analytic_disc = CrashingDisciplineInRun(name="Crash_run", a_string="Hello") retry_discipline = RetryDiscipline( analytic_disc, - n_retry=n_retry, - disc_timeout=disc_timeout, + n_max_retry=5, + disc_timeout=100.0, abort_exceptions=abort_exceptions, ) - inputs = {"x": array([1.0])} - with pytest.raises(NotImplementedError) as err: - retry_discipline.execute(inputs) + retry_discipline.execute({"x": array([1.0])}) assert retry_discipline.n_executions == 1 assert err.type is NotImplementedError @@ -227,15 +219,12 @@ def test_a_not_implemented_error_analytic_discipline() -> None: def test_retry_discipline_timeout_feature() -> None: """Test the timeout feature of discipline with a long computation.""" disc = DisciplineLongTimeRunning() - timeout = 3.0 n_retry = 1 - disc_with_timeout = RetryDiscipline(disc, disc_timeout=timeout, n_retry=n_retry) - - inputs = {"x": array([0.0])} + disc_with_timeout = RetryDiscipline(disc, disc_timeout=3.0, n_max_retry=n_retry) with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: - disc_with_timeout.execute(inputs) + disc_with_timeout.execute({"x": array([0.0])}) assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry -- GitLab From 325de924e506daeb03ca8db0c9f47d2f0e1cdcff Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 15 Jan 2025 17:31:21 +0100 Subject: [PATCH 17/58] test: remove test_retry_discipline_with_abort_exceptions, and in discipline modify default to 0 for wait_time --- .../disciplines/wrappers/retry_discipline.py | 2 +- .../wrappers/test_retry_discipline.py | 23 ------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index d34388a09e..a3dfed4faf 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -62,7 +62,7 @@ class RetryDiscipline(Discipline): self, discipline: Discipline, n_max_retry: int = 5, - wait_time: float = 1, + wait_time: float = 0, disc_timeout: float | None = None, abort_exceptions: Iterable[type[Exception]] = (), ) -> None: diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 8c31e547fb..1772f9aecd 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -165,29 +165,6 @@ def test_failure_zero_division_error_with_timeout( assert str(err.value) == "float division by zero" -def test_retry_discipline_with_abort_exceptions(a_crashing_analytic_discipline) -> None: - """Test discipline with a_crashing_analytic_discipline and a tuple of - - a tuple of abort_exceptions that abort the retry (ZeroDivisionError). - """ - abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) - analytic_disc = a_crashing_analytic_discipline - retry_discipline = RetryDiscipline( - analytic_disc, - n_max_retry=5, - disc_timeout=100.0, - abort_exceptions=abort_exceptions, - ) - - with pytest.raises(ZeroDivisionError) as err: - retry_discipline.execute({"x": array([0.0])}) - - assert retry_discipline.n_executions == 1 - assert err.type is ZeroDivisionError - assert err.typename == "ZeroDivisionError" - assert str(err.value) == "float division by zero" - - def test_a_not_implemented_error_analytic_discipline() -> None: """Test discipline with a_crashing_discipline_in_run and a tuple of -- GitLab From 0b08012c8d673fdcf125af4f169990f60665eda2 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 16 Jan 2025 16:47:50 +0100 Subject: [PATCH 18/58] refactor: modify some variable types, default values, and extended test with Exception errors --- .../disciplines/wrappers/retry_discipline.py | 13 +++---- .../wrappers/test_retry_discipline.py | 35 +++++++++++++++---- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index a3dfed4faf..1552b61ff2 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -63,17 +63,18 @@ class RetryDiscipline(Discipline): discipline: Discipline, n_max_retry: int = 5, wait_time: float = 0, - disc_timeout: float | None = None, + disc_timeout: float = 0.0, abort_exceptions: Iterable[type[Exception]] = (), ) -> None: """ Args: discipline: The discipline to wrap in the retry loop. n_max_retry: The maximum number of retry of the discipline. - wait_time: The time to wait between 2 retries (in seconds). + wait_time: The time to wait between 2 retries (in seconds). Default: 0 s disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the - execution is terminated. + execution is terminated. If disc_timeout = 0, it means the + discipline is executed without timeout limit. abort_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. @@ -105,7 +106,7 @@ class RetryDiscipline(Discipline): """If no timeout set, normal execution inside the "retry" loop else enclose the discipline in ProcessPoolExecutor to run it. """ - if self.disc_timeout is None: + if self.disc_timeout == 0.0: process_results = self.__discipline.execute(input_data) self.__discipline.local_data.update(process_results) else: @@ -144,7 +145,7 @@ class RetryDiscipline(Discipline): def _execute_discipline(self, input_data: StrKeyMapping) -> None: """Execute the discipline. - disc_timeout is set to n seconds, or None + disc_timeout is set to n seconds. """ LOGGER.debug("time_out set to : %s s", self.disc_timeout) @@ -178,7 +179,7 @@ class RetryDiscipline(Discipline): err_msg = ( f"Process stopped as it exceeds timeout ({self.disc_timeout!s} s)" ) - LOGGER.info(err_msg) + LOGGER.exception(err_msg) raise except Exception as err: # noqa: BLE001 diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 1772f9aecd..dac234b1f6 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -34,6 +34,8 @@ from gemseo.core.discipline import Discipline from gemseo.disciplines.wrappers.retry_discipline import RetryDiscipline if TYPE_CHECKING: + from collections.abc import Iterable + from gemseo.typing import StrKeyMapping LOGGER = getLogger(__name__) @@ -51,6 +53,16 @@ def a_crashing_analytic_discipline() -> Discipline: return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) +@pytest.fixture +def a_crashing_discipline_in_run() -> Discipline: + return CrashingDisciplineInRun(name="Crash_run", a_string="Hello") + + +@pytest.fixture +def a_long_time_running_discipline() -> Discipline: + return DisciplineLongTimeRunning() + + class CrashingDisciplineInRun(Discipline): """A discipline rising NotImplementedError in ``_run``.""" @@ -136,9 +148,17 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: disc.execute({"x": array([0.0])}) +@pytest.mark.parametrize( + "tuple_errors", + [ + (ZeroDivisionError,), + (ZeroDivisionError, FloatingPointError, OverflowError), + (OverflowError, FloatingPointError, ZeroDivisionError), + ], +) @pytest.mark.parametrize("n_try", [1, 3]) def test_failure_zero_division_error_with_timeout( - a_crashing_analytic_discipline, n_try: int + n_try: int, tuple_errors: Iterable[type[Exception]], a_crashing_analytic_discipline ) -> None: """Test failure of the discipline with timeout and a bad x entry. @@ -146,7 +166,7 @@ def test_failure_zero_division_error_with_timeout( n_retry=1 and 3 to be sure every case is ok. """ - abort_exceptions = (ZeroDivisionError, FloatingPointError, OverflowError) + abort_exceptions = tuple_errors analytic_disc = a_crashing_analytic_discipline disc = RetryDiscipline( @@ -165,7 +185,9 @@ def test_failure_zero_division_error_with_timeout( assert str(err.value) == "float division by zero" -def test_a_not_implemented_error_analytic_discipline() -> None: +def test_a_not_implemented_error_analytic_discipline( + a_crashing_discipline_in_run, +) -> None: """Test discipline with a_crashing_discipline_in_run and a tuple of a tuple of abort_exceptions that abort the retry (ZeroDivisionError). @@ -177,7 +199,8 @@ def test_a_not_implemented_error_analytic_discipline() -> None: NotImplementedError, ) - analytic_disc = CrashingDisciplineInRun(name="Crash_run", a_string="Hello") + analytic_disc = a_crashing_discipline_in_run + # analytic_disc = a_crashing_discipline_in_run(name="Crash_run", a_string="Hello") retry_discipline = RetryDiscipline( analytic_disc, n_max_retry=5, @@ -193,9 +216,9 @@ def test_a_not_implemented_error_analytic_discipline() -> None: assert err.typename == "NotImplementedError" -def test_retry_discipline_timeout_feature() -> None: +def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> None: """Test the timeout feature of discipline with a long computation.""" - disc = DisciplineLongTimeRunning() + disc = a_long_time_running_discipline n_retry = 1 disc_with_timeout = RetryDiscipline(disc, disc_timeout=3.0, n_max_retry=n_retry) -- GitLab From 151fcce52a873611198b5feacf3ddf17584ff526 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 17 Jan 2025 11:59:26 +0100 Subject: [PATCH 19/58] refactor: in tests, removing pointless declarations. In discipline: modify 'if' statement for timeout --- .../disciplines/wrappers/retry_discipline.py | 2 +- .../wrappers/test_retry_discipline.py | 66 +++++++------------ 2 files changed, 24 insertions(+), 44 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 1552b61ff2..6f174b62ad 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -106,7 +106,7 @@ class RetryDiscipline(Discipline): """If no timeout set, normal execution inside the "retry" loop else enclose the discipline in ProcessPoolExecutor to run it. """ - if self.disc_timeout == 0.0: + if not self.disc_timeout: process_results = self.__discipline.execute(input_data) self.__discipline.local_data.update(process_results) else: diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index dac234b1f6..c69634f21c 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -95,12 +95,8 @@ class DisciplineLongTimeRunning(Discipline): def test_retry_discipline(an_analytic_discipline) -> None: """Test discipline, no timeout set.""" - analytic_disc = an_analytic_discipline - - retry_discipline = RetryDiscipline(analytic_disc) - - inputs = {"x": array([4.0])} - retry_discipline.execute(inputs) + retry_discipline = RetryDiscipline(an_analytic_discipline) + retry_discipline.execute({"x": array([4.0])}) assert retry_discipline.local_data == {"x": array([4.0]), "y": array([9.0])} assert retry_discipline.n_executions == 1 @@ -108,10 +104,7 @@ def test_retry_discipline(an_analytic_discipline) -> None: def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: """Test discipline with a timeout.""" - analytic_disc = an_analytic_discipline - - disc_with_timeout = RetryDiscipline(analytic_disc, disc_timeout=10) - + disc_with_timeout = RetryDiscipline(an_analytic_discipline, disc_timeout=10) disc_with_timeout.execute({"x": array([4.0])}) assert disc_with_timeout.local_data == {"x": array([4.0]), "y": array([9.0])} @@ -121,16 +114,11 @@ def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: @pytest.mark.parametrize("n_retry", [1, 2, 3]) def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) -> None: """Test failure of the discipline with a too much very short timeout.""" - analytic_disc = an_analytic_discipline - disc_with_timeout = RetryDiscipline( - analytic_disc, disc_timeout=1e-4, n_max_retry=n_retry + an_analytic_discipline, disc_timeout=1e-4, n_max_retry=n_retry ) - - inputs = {"x": array([4.0])} - with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: - disc_with_timeout.execute(inputs) + disc_with_timeout.execute({"x": array([4.0])}) assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry @@ -141,12 +129,14 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: In order to catch the ZeroDivisionError, set n_retry=1 """ - analytic_disc = a_crashing_analytic_discipline - disc = RetryDiscipline(analytic_disc, n_max_retry=1) - - with pytest.raises(ZeroDivisionError): + disc = RetryDiscipline(a_crashing_analytic_discipline, n_max_retry=1) + with pytest.raises(ZeroDivisionError) as err: disc.execute({"x": array([0.0])}) + assert err.type is ZeroDivisionError + assert err.typename == "ZeroDivisionError" + assert str(err.value) == "float division by zero" + @pytest.mark.parametrize( "tuple_errors", @@ -165,17 +155,12 @@ def test_failure_zero_division_error_with_timeout( In order to catch the ZeroDivisionError that arises before timeout (5s), test with n_retry=1 and 3 to be sure every case is ok. """ - - abort_exceptions = tuple_errors - - analytic_disc = a_crashing_analytic_discipline disc = RetryDiscipline( - analytic_disc, + a_crashing_analytic_discipline, n_max_retry=n_try, disc_timeout=5.0, - abort_exceptions=abort_exceptions, + abort_exceptions=tuple_errors, ) - with pytest.raises(ZeroDivisionError) as err: disc.execute({"x": array([0.0])}) @@ -192,22 +177,17 @@ def test_a_not_implemented_error_analytic_discipline( a tuple of abort_exceptions that abort the retry (ZeroDivisionError). """ - abort_exceptions = ( - ZeroDivisionError, - FloatingPointError, - OverflowError, - NotImplementedError, - ) - - analytic_disc = a_crashing_discipline_in_run - # analytic_disc = a_crashing_discipline_in_run(name="Crash_run", a_string="Hello") retry_discipline = RetryDiscipline( - analytic_disc, + a_crashing_discipline_in_run, n_max_retry=5, disc_timeout=100.0, - abort_exceptions=abort_exceptions, + abort_exceptions=( + ZeroDivisionError, + FloatingPointError, + OverflowError, + NotImplementedError, + ), ) - with pytest.raises(NotImplementedError) as err: retry_discipline.execute({"x": array([1.0])}) @@ -218,11 +198,11 @@ def test_a_not_implemented_error_analytic_discipline( def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> None: """Test the timeout feature of discipline with a long computation.""" - disc = a_long_time_running_discipline n_retry = 1 - disc_with_timeout = RetryDiscipline(disc, disc_timeout=3.0, n_max_retry=n_retry) - + disc_with_timeout = RetryDiscipline( + a_long_time_running_discipline, disc_timeout=3.0, n_max_retry=n_retry + ) with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute({"x": array([0.0])}) -- GitLab From 805f6c19c04305465e3f7366e338894f1bd1bd9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:20:58 +0000 Subject: [PATCH 20/58] doc: apply suggested change during review --- changelog/fragments/927.added.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/fragments/927.added.rst b/changelog/fragments/927.added.rst index e1649f26a6..4253f4d630 100644 --- a/changelog/fragments/927.added.rst +++ b/changelog/fragments/927.added.rst @@ -1 +1 @@ -- New discipline RetryDiscipline : Wraps a discipline to retry the execution several times. Try to execute the discipline, if it raises an exception retries up to a maximum number of attempts. Can pass a tuple of exceptions that, if one of them raised, do not retry the execution. +New discipline RetryDiscipline: it wraps a discipline to retry the execution several times. It tries to execute the discipline, if it raises an exception then it retries up to a maximum number of attempts. It can pass a tuple of exceptions that, if one of them raised, do not retry the execution. -- GitLab From eb7523f40c53cf5be1afee853900973baa657b81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:21:30 +0000 Subject: [PATCH 21/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 6f174b62ad..df7e8552ff 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -50,7 +50,7 @@ class RetryDiscipline(Discipline): A timeout in seconds can be specified to prevent executions from becoming stuck. - User can also provide a tuple of :class:`.Exceptions that, if one of them raised, + User can also provide a tuple of :class:`.Exceptions` that, if one of them raised, it does not retry the execution. """ -- GitLab From 9bcfc1ec762f0d86b988790d47d1e0411f62ac19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:21:44 +0000 Subject: [PATCH 22/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index df7e8552ff..428bae92b9 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -52,7 +52,6 @@ class RetryDiscipline(Discipline): User can also provide a tuple of :class:`.Exceptions` that, if one of them raised, it does not retry the execution. - """ n_executions: int -- GitLab From a9cb6842da4bec983eeab09e5f80bf49d1b70d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:22:01 +0000 Subject: [PATCH 23/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 428bae92b9..5a12df95f8 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -84,9 +84,8 @@ class RetryDiscipline(Discipline): """ # noqa:D205 D212 D415 super().__init__(discipline.name) self.__discipline = discipline - self.input_grammar = discipline.input_grammar - self.output_grammar = discipline.output_grammar - self.default_input_data = discipline.default_input_data + self.io.input_grammar = discipline.io.input_grammar + self.io.output_grammar = discipline.io.output_grammar self.n_max_retry = n_max_retry self.wait_time = wait_time self.disc_timeout = disc_timeout -- GitLab From 196e9e13144032864bdb2674cd6aebda4340de36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:25:55 +0000 Subject: [PATCH 24/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 5a12df95f8..ebf8d43cfd 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -61,7 +61,7 @@ class RetryDiscipline(Discipline): self, discipline: Discipline, n_max_retry: int = 5, - wait_time: float = 0, + wait_time: float = 0., disc_timeout: float = 0.0, abort_exceptions: Iterable[type[Exception]] = (), ) -> None: -- GitLab From ebef8bec80170578335e2fce3214b210aec853e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:26:18 +0000 Subject: [PATCH 25/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index ebf8d43cfd..d323cc5148 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -91,7 +91,7 @@ class RetryDiscipline(Discipline): self.disc_timeout = disc_timeout self.abort_exceptions = abort_exceptions - def _run(self, input_data: StrKeyMapping | None = None) -> StrKeyMapping | None: + def _run(self, input_data: StrKeyMapping) -> StrKeyMapping | None: current_error = None self.n_executions = 0 -- GitLab From 09ed9f19554d865478606e882f526e856ad249c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:26:48 +0000 Subject: [PATCH 26/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index d323cc5148..c62d2f4be9 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -116,7 +116,7 @@ class RetryDiscipline(Discipline): stop = False continue except Exception as err: # noqa: BLE001 - if type(err) in self.abort_exceptions: + if isinstance(err, self.abort_exceptions): msg = ( f"Failed to execute discipline {self.__discipline.name}, " f"aborting retry because of the exception type {type(err)!s}." -- GitLab From cfca79d5fe3a2fa618f62380fabb6ab6f2e584b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:27:43 +0000 Subject: [PATCH 27/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index c62d2f4be9..61553e0db9 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -69,7 +69,7 @@ class RetryDiscipline(Discipline): Args: discipline: The discipline to wrap in the retry loop. n_max_retry: The maximum number of retry of the discipline. - wait_time: The time to wait between 2 retries (in seconds). Default: 0 s + wait_time: The time to wait between 2 retries (in seconds). disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the execution is terminated. If disc_timeout = 0, it means the -- GitLab From dd608eb22e816d47d6b994ff93ed4c71e92a4a8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Mon, 20 Jan 2025 16:28:00 +0000 Subject: [PATCH 28/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 61553e0db9..286f67b48c 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -72,7 +72,7 @@ class RetryDiscipline(Discipline): wait_time: The time to wait between 2 retries (in seconds). disc_timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the - execution is terminated. If disc_timeout = 0, it means the + execution is terminated. If ``0``, the discipline is executed without timeout limit. abort_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. -- GitLab From 71bbfd6a2299744ed2d1d227dcbe7d06e76c8a8a Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 21 Jan 2025 14:10:57 +0100 Subject: [PATCH 29/58] refactor: modify some variable naming --- .../disciplines/wrappers/retry_discipline.py | 72 +++++++++++-------- .../wrappers/test_retry_discipline.py | 22 +++--- 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 286f67b48c..2dacf88a95 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -12,13 +12,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -"""The retry discipline module. - -A RetryDiscipline allows a discipline to be executed multiple times (up to a -specified number of retries) if the previous attempts fail to produce any result. - -For example, a timeout can be specified to prevent executions from becoming stuck. -""" +"""The retry discipline module.""" from __future__ import annotations @@ -57,28 +51,41 @@ class RetryDiscipline(Discipline): n_executions: int """Number of performed executions of the discipline.""" + n_retry: int + """The maximum number of retry of the discipline.""" + + wait_time: float + """The time to wait between 2 retries (in seconds).""" + + timeout: float + """The maximum duration, in seconds, that the discipline is allowed to run.""" + + fatal_exceptions: Iterable[type[Exception]] + """The exceptions for which the code raises an exception and exit immediately + without retrying a run.""" + def __init__( self, discipline: Discipline, - n_max_retry: int = 5, - wait_time: float = 0., - disc_timeout: float = 0.0, - abort_exceptions: Iterable[type[Exception]] = (), + n_retry: int = 5, + wait_time: float = 0.0, + timeout: float = 0.0, + fatal_exceptions: Iterable[type[Exception]] = (), ) -> None: """ Args: discipline: The discipline to wrap in the retry loop. - n_max_retry: The maximum number of retry of the discipline. + n_retry: The maximum number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). - disc_timeout : The maximum duration, in seconds, that the discipline is + timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the execution is terminated. If ``0``, the discipline is executed without timeout limit. - abort_exceptions: The exceptions for which the code raises an + fatal_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. Raises: - TimeoutError: if disc_timeout limit is reached. + TimeoutError: if timeout limit is reached. Other exceptions if issue encountered during the execution of 'discipline'. """ # noqa:D205 D212 D415 @@ -86,25 +93,28 @@ class RetryDiscipline(Discipline): self.__discipline = discipline self.io.input_grammar = discipline.io.input_grammar self.io.output_grammar = discipline.io.output_grammar - self.n_max_retry = n_max_retry + self.n_retry = n_retry self.wait_time = wait_time - self.disc_timeout = disc_timeout - self.abort_exceptions = abort_exceptions + self.timeout = timeout + self.fatal_exceptions = fatal_exceptions + self.n_executions = 0 def _run(self, input_data: StrKeyMapping) -> StrKeyMapping | None: current_error = None self.n_executions = 0 stop = True - for _n in range(1, self.n_max_retry + 1): + for n_try in range(1, self.n_retry + 1): stop = True self.n_executions += 1 + msg = f"Trying to run the discipline. Attempt {n_try}/{self.n_retry}" + LOGGER.debug(msg) + try: - """If no timeout set, normal execution inside the "retry" loop - else enclose the discipline in ProcessPoolExecutor to run it. - """ - if not self.disc_timeout: + # If no timeout set, normal execution inside the "retry" loop + # else enclose the discipline in ProcessPoolExecutor to run it. + if not self.timeout: process_results = self.__discipline.execute(input_data) self.__discipline.local_data.update(process_results) else: @@ -116,7 +126,8 @@ class RetryDiscipline(Discipline): stop = False continue except Exception as err: # noqa: BLE001 - if isinstance(err, self.abort_exceptions): + if isinstance(err, self.fatal_exceptions): + # if err in self.fatal_exceptions: msg = ( f"Failed to execute discipline {self.__discipline.name}, " f"aborting retry because of the exception type {type(err)!s}." @@ -132,7 +143,7 @@ class RetryDiscipline(Discipline): if not stop and current_error is not None: msg = ( f"Failed to execute discipline {self.__discipline.name} " - f"after {self.n_max_retry} attempts.", + f"after {self.n_retry} attempts.", ) LOGGER.error(msg) raise current_error @@ -143,9 +154,10 @@ class RetryDiscipline(Discipline): def _execute_discipline(self, input_data: StrKeyMapping) -> None: """Execute the discipline. - disc_timeout is set to n seconds. + Args: + input_data : the input passed to the discipline. """ - LOGGER.debug("time_out set to : %s s", self.disc_timeout) + LOGGER.debug("time_out set to : %s s", self.timeout) with ProcessPoolExecutor() as executor: run_discipline = executor.submit( @@ -156,7 +168,7 @@ class RetryDiscipline(Discipline): try: LOGGER.debug("Running discipline and get the output") - process_results = run_discipline.result(timeout=self.disc_timeout) + process_results = run_discipline.result(timeout=self.timeout) self.__discipline.local_data.update(process_results) except (cfutures._base.TimeoutError, TimeoutError): @@ -174,9 +186,7 @@ class RetryDiscipline(Discipline): for pid in pid_child: os.kill(pid, signal.SIGTERM) - err_msg = ( - f"Process stopped as it exceeds timeout ({self.disc_timeout!s} s)" - ) + err_msg = f"Process stopped as it exceeds timeout ({self.timeout!s} s)" LOGGER.exception(err_msg) raise diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index c69634f21c..2b5780fa86 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -104,7 +104,7 @@ def test_retry_discipline(an_analytic_discipline) -> None: def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: """Test discipline with a timeout.""" - disc_with_timeout = RetryDiscipline(an_analytic_discipline, disc_timeout=10) + disc_with_timeout = RetryDiscipline(an_analytic_discipline, timeout=10) disc_with_timeout.execute({"x": array([4.0])}) assert disc_with_timeout.local_data == {"x": array([4.0]), "y": array([9.0])} @@ -115,7 +115,7 @@ def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) -> None: """Test failure of the discipline with a too much very short timeout.""" disc_with_timeout = RetryDiscipline( - an_analytic_discipline, disc_timeout=1e-4, n_max_retry=n_retry + an_analytic_discipline, timeout=1e-4, n_retry=n_retry ) with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute({"x": array([4.0])}) @@ -129,7 +129,7 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: In order to catch the ZeroDivisionError, set n_retry=1 """ - disc = RetryDiscipline(a_crashing_analytic_discipline, n_max_retry=1) + disc = RetryDiscipline(a_crashing_analytic_discipline, n_retry=1) with pytest.raises(ZeroDivisionError) as err: disc.execute({"x": array([0.0])}) @@ -157,9 +157,9 @@ def test_failure_zero_division_error_with_timeout( """ disc = RetryDiscipline( a_crashing_analytic_discipline, - n_max_retry=n_try, - disc_timeout=5.0, - abort_exceptions=tuple_errors, + n_retry=n_try, + timeout=5.0, + fatal_exceptions=tuple_errors, ) with pytest.raises(ZeroDivisionError) as err: disc.execute({"x": array([0.0])}) @@ -175,13 +175,13 @@ def test_a_not_implemented_error_analytic_discipline( ) -> None: """Test discipline with a_crashing_discipline_in_run and a tuple of - a tuple of abort_exceptions that abort the retry (ZeroDivisionError). + a tuple of fatal_exceptions that abort the retry (ZeroDivisionError). """ retry_discipline = RetryDiscipline( a_crashing_discipline_in_run, - n_max_retry=5, - disc_timeout=100.0, - abort_exceptions=( + n_retry=5, + timeout=100.0, + fatal_exceptions=( ZeroDivisionError, FloatingPointError, OverflowError, @@ -201,7 +201,7 @@ def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> Non n_retry = 1 disc_with_timeout = RetryDiscipline( - a_long_time_running_discipline, disc_timeout=3.0, n_max_retry=n_retry + a_long_time_running_discipline, timeout=3.0, n_retry=n_retry ) with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute({"x": array([0.0])}) -- GitLab From 0caf1d5b5eae56011c0cd7cc302eb63437c961c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Tue, 21 Jan 2025 15:02:06 +0000 Subject: [PATCH 30/58] Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Antoine Dechaume <7801279-AntoineD@users.noreply.gitlab.com> --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 2dacf88a95..2b6b2bb0b0 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -49,7 +49,7 @@ class RetryDiscipline(Discipline): """ n_executions: int - """Number of performed executions of the discipline.""" + """The number of performed executions of the discipline.""" n_retry: int """The maximum number of retry of the discipline.""" -- GitLab From e9d5d918e6cbaeb101606b6e0a9ba596a82e8d6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Tue, 21 Jan 2025 15:07:05 +0000 Subject: [PATCH 31/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 2b6b2bb0b0..ca4921e5c2 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -12,7 +12,7 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -"""The retry discipline module.""" +"""The retry discipline.""" from __future__ import annotations @@ -155,7 +155,7 @@ class RetryDiscipline(Discipline): """Execute the discipline. Args: - input_data : the input passed to the discipline. + input_data: The input data passed to the discipline. """ LOGGER.debug("time_out set to : %s s", self.timeout) -- GitLab From c4bcc9fa9431a295900c9fc674cacc8e6d379d6f Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 21 Jan 2025 16:33:18 +0100 Subject: [PATCH 32/58] refactor: modified discipline timeout defalut to math.inf , and some comments --- src/gemseo/disciplines/wrappers/retry_discipline.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index ca4921e5c2..2cd87ea007 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -17,6 +17,7 @@ from __future__ import annotations import concurrent.futures as cfutures +import math import os import signal import time @@ -52,7 +53,7 @@ class RetryDiscipline(Discipline): """The number of performed executions of the discipline.""" n_retry: int - """The maximum number of retry of the discipline.""" + """The number of retry of the discipline.""" wait_time: float """The time to wait between 2 retries (in seconds).""" @@ -69,17 +70,17 @@ class RetryDiscipline(Discipline): discipline: Discipline, n_retry: int = 5, wait_time: float = 0.0, - timeout: float = 0.0, + timeout: float = math.inf, fatal_exceptions: Iterable[type[Exception]] = (), ) -> None: """ Args: discipline: The discipline to wrap in the retry loop. - n_retry: The maximum number of retry of the discipline. + n_retry: The number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). timeout : The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the - execution is terminated. If ``0``, the + execution is terminated. If ``math.inf``, the discipline is executed without timeout limit. fatal_exceptions: The exceptions for which the code raises an exception and exit immediately without retrying a run. @@ -114,7 +115,7 @@ class RetryDiscipline(Discipline): try: # If no timeout set, normal execution inside the "retry" loop # else enclose the discipline in ProcessPoolExecutor to run it. - if not self.timeout: + if math.isinf(self.timeout): process_results = self.__discipline.execute(input_data) self.__discipline.local_data.update(process_results) else: @@ -172,7 +173,7 @@ class RetryDiscipline(Discipline): self.__discipline.local_data.update(process_results) except (cfutures._base.TimeoutError, TimeoutError): - """Runtime exceeds timeout, killing process and its children.""" + # Runtime exceeds timeout, killing process and its children. LOGGER.debug("TimeoutError: Time out waiting for result()") -- GitLab From 8814f366a78d545e385ec3861c05135c6865d297 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 21 Jan 2025 16:48:11 +0100 Subject: [PATCH 33/58] docs: update docstring of the class, about timeout exceptions --- src/gemseo/disciplines/wrappers/retry_discipline.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 2cd87ea007..577a02122e 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -47,6 +47,12 @@ class RetryDiscipline(Discipline): User can also provide a tuple of :class:`.Exceptions` that, if one of them raised, it does not retry the execution. + + Please note the TimeoutError exception is also caught if the wrapped discipline + raise such an exception (i.e. aside from RetryDiscipline itself). So it could + lead to 2 surprising cases, but in fact normal cases: + - a TimeoutError exception even though the user didn't provide any timeout value. + - a TimeoutError raised sooner than the timeout value set by the user. """ n_executions: int -- GitLab From 527bb0b325fa8ee5a1a89b2f5c49d4661a2b749f Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 21 Jan 2025 17:36:30 +0100 Subject: [PATCH 34/58] refactor: modify LOGGER formatting --- .../disciplines/wrappers/retry_discipline.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 577a02122e..d8de179cc6 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -115,8 +115,11 @@ class RetryDiscipline(Discipline): stop = True self.n_executions += 1 - msg = f"Trying to run the discipline. Attempt {n_try}/{self.n_retry}" - LOGGER.debug(msg) + LOGGER.debug( + "Trying to run the discipline. Attempt %s/%s", + str(n_try), + str(self.n_retry), + ) try: # If no timeout set, normal execution inside the "retry" loop @@ -134,12 +137,12 @@ class RetryDiscipline(Discipline): continue except Exception as err: # noqa: BLE001 if isinstance(err, self.fatal_exceptions): - # if err in self.fatal_exceptions: - msg = ( - f"Failed to execute discipline {self.__discipline.name}, " - f"aborting retry because of the exception type {type(err)!s}." + LOGGER.info( + "Failed to execute discipline %s, " + "aborting retry because of the exception type %s.", + self.__discipline.name, + str(type(err)), ) - LOGGER.info(msg) raise current_error = err @@ -148,11 +151,11 @@ class RetryDiscipline(Discipline): break time.sleep(self.wait_time) if not stop and current_error is not None: - msg = ( - f"Failed to execute discipline {self.__discipline.name} " - f"after {self.n_retry} attempts.", + LOGGER.error( + "Failed to execute discipline %s after %s attempts.", + self.__discipline.name, + str(self.n_retry), ) - LOGGER.error(msg) raise current_error LOGGER.debug("Transfer output data in RetryDiscipline local data.") @@ -193,8 +196,9 @@ class RetryDiscipline(Discipline): for pid in pid_child: os.kill(pid, signal.SIGTERM) - err_msg = f"Process stopped as it exceeds timeout ({self.timeout!s} s)" - LOGGER.exception(err_msg) + LOGGER.exception( + "Process stopped as it exceeds timeout (%s s)", str(self.timeout) + ) raise except Exception as err: # noqa: BLE001 -- GitLab From d216f0797117fb29082e27fc774fc4fea17b7e44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Tue, 21 Jan 2025 16:39:06 +0000 Subject: [PATCH 35/58] doc: apply suggested change during review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index d8de179cc6..751f8718a3 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -48,11 +48,11 @@ class RetryDiscipline(Discipline): User can also provide a tuple of :class:`.Exceptions` that, if one of them raised, it does not retry the execution. - Please note the TimeoutError exception is also caught if the wrapped discipline - raise such an exception (i.e. aside from RetryDiscipline itself). So it could + Please note the ``TimeoutError`` exception is also caught if the wrapped discipline + raise such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could lead to 2 surprising cases, but in fact normal cases: - - a TimeoutError exception even though the user didn't provide any timeout value. - - a TimeoutError raised sooner than the timeout value set by the user. + - a ``TimeoutError`` exception even though the user didn't provide any timeout value. + - a ``TimeoutError`` raised sooner than the ``timeout`` value set by the user. """ n_executions: int @@ -92,8 +92,8 @@ class RetryDiscipline(Discipline): exception and exit immediately without retrying a run. Raises: - TimeoutError: if timeout limit is reached. - Other exceptions if issue encountered during the execution of 'discipline'. + TimeoutError: If the ``timeout`` limit is reached. + Exception: Other exceptions if an issue is encountered during the execution of ``discipline``. """ # noqa:D205 D212 D415 super().__init__(discipline.name) -- GitLab From e78118cc90a44699b1d0fccf4ddad2d31c859b2c Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 21 Jan 2025 17:43:27 +0100 Subject: [PATCH 36/58] docs: reformatting some comments --- src/gemseo/disciplines/wrappers/retry_discipline.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 751f8718a3..86775dfea6 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -51,7 +51,8 @@ class RetryDiscipline(Discipline): Please note the ``TimeoutError`` exception is also caught if the wrapped discipline raise such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could lead to 2 surprising cases, but in fact normal cases: - - a ``TimeoutError`` exception even though the user didn't provide any timeout value. + - a ``TimeoutError`` exception even though the user didn't provide any timeout + value. - a ``TimeoutError`` raised sooner than the ``timeout`` value set by the user. """ @@ -93,7 +94,8 @@ class RetryDiscipline(Discipline): Raises: TimeoutError: If the ``timeout`` limit is reached. - Exception: Other exceptions if an issue is encountered during the execution of ``discipline``. + Exception: Other exceptions if an issue is encountered during the + execution of ``discipline``. """ # noqa:D205 D212 D415 super().__init__(discipline.name) -- GitLab From bc8e6624199b1e517676b54b4cca4b50ca5b243c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 22 Jan 2025 09:51:31 +0000 Subject: [PATCH 37/58] docs: some syntax modifications --- src/gemseo/disciplines/wrappers/retry_discipline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 86775dfea6..122a2e1c88 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -45,11 +45,11 @@ class RetryDiscipline(Discipline): A timeout in seconds can be specified to prevent executions from becoming stuck. - User can also provide a tuple of :class:`.Exceptions` that, if one of them raised, + Users can also provide a tuple of :class:`.Exception` that, if one of them is raised, it does not retry the execution. - Please note the ``TimeoutError`` exception is also caught if the wrapped discipline - raise such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could + Please note that the ``TimeoutError`` exception is also caught if the wrapped discipline + raises such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could lead to 2 surprising cases, but in fact normal cases: - a ``TimeoutError`` exception even though the user didn't provide any timeout value. -- GitLab From 7a1e9c560d926eb4694921ef106ac3aefc0e146c Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 11:01:12 +0100 Subject: [PATCH 38/58] refactor: fix some LOGGER output formatting --- .../disciplines/wrappers/retry_discipline.py | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 122a2e1c88..7af4f2642c 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -45,12 +45,12 @@ class RetryDiscipline(Discipline): A timeout in seconds can be specified to prevent executions from becoming stuck. - Users can also provide a tuple of :class:`.Exception` that, if one of them is raised, - it does not retry the execution. + Users can also provide a tuple of :class:`.Exception` that, if one of them is + raised, it does not retry the execution. - Please note that the ``TimeoutError`` exception is also caught if the wrapped discipline - raises such an exception (i.e. aside from ``RetryDiscipline`` itself). So it could - lead to 2 surprising cases, but in fact normal cases: + Please note that the ``TimeoutError`` exception is also caught if the wrapped + discipline raises such an exception (i.e. aside from ``RetryDiscipline`` itself). + So it could lead to 2 surprising cases, but in fact normal cases: - a ``TimeoutError`` exception even though the user didn't provide any timeout value. - a ``TimeoutError`` raised sooner than the ``timeout`` value set by the user. @@ -118,9 +118,7 @@ class RetryDiscipline(Discipline): self.n_executions += 1 LOGGER.debug( - "Trying to run the discipline. Attempt %s/%s", - str(n_try), - str(self.n_retry), + "Trying to run the discipline. Attempt %d/%d", n_try, self.n_retry ) try: @@ -154,9 +152,9 @@ class RetryDiscipline(Discipline): time.sleep(self.wait_time) if not stop and current_error is not None: LOGGER.error( - "Failed to execute discipline %s after %s attempts.", + "Failed to execute discipline %s after %d attempts.", self.__discipline.name, - str(self.n_retry), + self.n_retry, ) raise current_error @@ -199,7 +197,7 @@ class RetryDiscipline(Discipline): os.kill(pid, signal.SIGTERM) LOGGER.exception( - "Process stopped as it exceeds timeout (%s s)", str(self.timeout) + "Process stopped as it exceeds timeout (%s s)", self.timeout ) raise -- GitLab From d27676fc5885b90547c33978af2726d2c0f23662 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 12:05:02 +0100 Subject: [PATCH 39/58] refactor: modify try except introducing output_data variable --- .../disciplines/wrappers/retry_discipline.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 7af4f2642c..f6815c7e3b 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -116,6 +116,7 @@ class RetryDiscipline(Discipline): for n_try in range(1, self.n_retry + 1): stop = True self.n_executions += 1 + output_data = {} LOGGER.debug( "Trying to run the discipline. Attempt %d/%d", n_try, self.n_retry @@ -125,10 +126,9 @@ class RetryDiscipline(Discipline): # If no timeout set, normal execution inside the "retry" loop # else enclose the discipline in ProcessPoolExecutor to run it. if math.isinf(self.timeout): - process_results = self.__discipline.execute(input_data) - self.__discipline.local_data.update(process_results) + output_data = self.__discipline.execute(input_data) else: - self._execute_discipline(input_data) + output_data = self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: LOGGER.debug("Try to re-launch the discipline as timeout is reached.") current_error = time_out_error @@ -147,6 +147,7 @@ class RetryDiscipline(Discipline): current_error = err stop = False + self.__discipline.local_data.update(output_data) if stop: break time.sleep(self.wait_time) @@ -161,7 +162,7 @@ class RetryDiscipline(Discipline): LOGGER.debug("Transfer output data in RetryDiscipline local data.") return self.__discipline.get_output_data() - def _execute_discipline(self, input_data: StrKeyMapping) -> None: + def _execute_discipline(self, input_data: StrKeyMapping) -> StrKeyMapping: """Execute the discipline. Args: @@ -177,10 +178,7 @@ class RetryDiscipline(Discipline): try: LOGGER.debug("Running discipline and get the output") - - process_results = run_discipline.result(timeout=self.timeout) - self.__discipline.local_data.update(process_results) - + output_data = run_discipline.result(timeout=self.timeout) except (cfutures._base.TimeoutError, TimeoutError): # Runtime exceeds timeout, killing process and its children. @@ -206,3 +204,5 @@ class RetryDiscipline(Discipline): raise LOGGER.debug("\n****** Exiting _execute_discipline *****:") + + return output_data -- GitLab From c49fb09ce1f20e7a2fa8a240be8316e46e456efe Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 14:30:58 +0100 Subject: [PATCH 40/58] refactor: update output_data location in _run --- src/gemseo/disciplines/wrappers/retry_discipline.py | 4 +--- tests/disciplines/wrappers/test_retry_discipline.py | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index f6815c7e3b..d905a1872e 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -116,7 +116,6 @@ class RetryDiscipline(Discipline): for n_try in range(1, self.n_retry + 1): stop = True self.n_executions += 1 - output_data = {} LOGGER.debug( "Trying to run the discipline. Attempt %d/%d", n_try, self.n_retry @@ -147,7 +146,6 @@ class RetryDiscipline(Discipline): current_error = err stop = False - self.__discipline.local_data.update(output_data) if stop: break time.sleep(self.wait_time) @@ -160,7 +158,7 @@ class RetryDiscipline(Discipline): raise current_error LOGGER.debug("Transfer output data in RetryDiscipline local data.") - return self.__discipline.get_output_data() + return output_data def _execute_discipline(self, input_data: StrKeyMapping) -> StrKeyMapping: """Execute the discipline. diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 2b5780fa86..b449a8385f 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -64,10 +64,10 @@ def a_long_time_running_discipline() -> Discipline: class CrashingDisciplineInRun(Discipline): - """A discipline rising NotImplementedError in ``_run``.""" + """A discipline raising NotImplementedError in ``_run``.""" def __init__(self, name: str = "", a_string=None) -> None: - """A discipline rising NotImplementedError in _run.""" + """A discipline raising NotImplementedError in _run.""" super().__init__(name=name) self.a_string = a_string -- GitLab From a753035c677bcebd4e72ed8af66afaceff003488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 22 Jan 2025 14:53:29 +0000 Subject: [PATCH 41/58] refactor: apply suggested change during review --- .../disciplines/wrappers/retry_discipline.py | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index d905a1872e..4a470ce6fa 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -112,9 +112,7 @@ class RetryDiscipline(Discipline): current_error = None self.n_executions = 0 - stop = True for n_try in range(1, self.n_retry + 1): - stop = True self.n_executions += 1 LOGGER.debug( @@ -128,28 +126,27 @@ class RetryDiscipline(Discipline): output_data = self.__discipline.execute(input_data) else: output_data = self._execute_discipline(input_data) + except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: LOGGER.debug("Try to re-launch the discipline as timeout is reached.") current_error = time_out_error - time.sleep(self.wait_time) - stop = False - continue - except Exception as err: # noqa: BLE001 - if isinstance(err, self.fatal_exceptions): + + except Exception as error: # noqa: BLE001 + if isinstance(error, self.fatal_exceptions): LOGGER.info( "Failed to execute discipline %s, " "aborting retry because of the exception type %s.", self.__discipline.name, - str(type(err)), + type(error), ) raise - - current_error = err - stop = False - if stop: + current_error = error + else: break + time.sleep(self.wait_time) - if not stop and current_error is not None: + + else: LOGGER.error( "Failed to execute discipline %s after %d attempts.", self.__discipline.name, @@ -197,8 +194,8 @@ class RetryDiscipline(Discipline): ) raise - except Exception as err: # noqa: BLE001 - LOGGER.debug(type(err)) + except Exception as error: # noqa: BLE001 + LOGGER.debug(type(error)) raise LOGGER.debug("\n****** Exiting _execute_discipline *****:") -- GitLab From d64d8802bfa984242cd61f3a771adf2a95119afd Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 15:57:04 +0100 Subject: [PATCH 42/58] docs: added a Returns section, removed a Logger line --- src/gemseo/disciplines/wrappers/retry_discipline.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 4a470ce6fa..4432115513 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -154,7 +154,6 @@ class RetryDiscipline(Discipline): ) raise current_error - LOGGER.debug("Transfer output data in RetryDiscipline local data.") return output_data def _execute_discipline(self, input_data: StrKeyMapping) -> StrKeyMapping: @@ -162,6 +161,9 @@ class RetryDiscipline(Discipline): Args: input_data: The input data passed to the discipline. + + Returns: + output_data : The output returned by the discipline. """ LOGGER.debug("time_out set to : %s s", self.timeout) -- GitLab From 838e315e74d04fc20a7263a04dc0def580471677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 22 Jan 2025 15:42:58 +0000 Subject: [PATCH 43/58] refactor: removed useless variable, and modified the Returns comment --- src/gemseo/disciplines/wrappers/retry_discipline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 4432115513..b23497e2a7 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -109,7 +109,6 @@ class RetryDiscipline(Discipline): self.n_executions = 0 def _run(self, input_data: StrKeyMapping) -> StrKeyMapping | None: - current_error = None self.n_executions = 0 for n_try in range(1, self.n_retry + 1): @@ -163,7 +162,7 @@ class RetryDiscipline(Discipline): input_data: The input data passed to the discipline. Returns: - output_data : The output returned by the discipline. + The output returned by the discipline. """ LOGGER.debug("time_out set to : %s s", self.timeout) -- GitLab From 3776a8c2a7fb83614da837b8e683d9d9732b2ef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 22 Jan 2025 16:26:49 +0000 Subject: [PATCH 44/58] refactor: simplify method _execute_discipline --- src/gemseo/disciplines/wrappers/retry_discipline.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index b23497e2a7..92c4f24ff6 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -174,7 +174,7 @@ class RetryDiscipline(Discipline): try: LOGGER.debug("Running discipline and get the output") - output_data = run_discipline.result(timeout=self.timeout) + return run_discipline.result(timeout=self.timeout) except (cfutures._base.TimeoutError, TimeoutError): # Runtime exceeds timeout, killing process and its children. @@ -200,5 +200,3 @@ class RetryDiscipline(Discipline): raise LOGGER.debug("\n****** Exiting _execute_discipline *****:") - - return output_data -- GitLab From 3d0dce026b72ea7afb56ba458a0d5f78f3f83d03 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 17:30:05 +0100 Subject: [PATCH 45/58] refactor: removed logger info --- src/gemseo/disciplines/wrappers/retry_discipline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 92c4f24ff6..2457021f2e 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -198,5 +198,3 @@ class RetryDiscipline(Discipline): except Exception as error: # noqa: BLE001 LOGGER.debug(type(error)) raise - - LOGGER.debug("\n****** Exiting _execute_discipline *****:") -- GitLab From b1f88da5bf862292a10e3b2ce98fc63d674f42af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Wed, 22 Jan 2025 16:41:25 +0000 Subject: [PATCH 46/58] refactor: apply suggestion but adding an "else" statement --- .../disciplines/wrappers/retry_discipline.py | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 2457021f2e..97de8e9628 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -122,9 +122,9 @@ class RetryDiscipline(Discipline): # If no timeout set, normal execution inside the "retry" loop # else enclose the discipline in ProcessPoolExecutor to run it. if math.isinf(self.timeout): - output_data = self.__discipline.execute(input_data) + return self.__discipline.execute(input_data) else: - output_data = self._execute_discipline(input_data) + return self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: LOGGER.debug("Try to re-launch the discipline as timeout is reached.") @@ -140,20 +140,15 @@ class RetryDiscipline(Discipline): ) raise current_error = error - else: - break time.sleep(self.wait_time) - else: - LOGGER.error( - "Failed to execute discipline %s after %d attempts.", - self.__discipline.name, - self.n_retry, - ) - raise current_error - - return output_data + LOGGER.error( + "Failed to execute discipline %s after %d attempts.", + self.__discipline.name, + self.n_retry, + ) + raise current_error def _execute_discipline(self, input_data: StrKeyMapping) -> StrKeyMapping: """Execute the discipline. -- GitLab From b82d7236ac4736b89a6d3b14706d4d6098e3e648 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Wed, 22 Jan 2025 17:45:57 +0100 Subject: [PATCH 47/58] fix: remove the else statement --- src/gemseo/disciplines/wrappers/retry_discipline.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 97de8e9628..dc353da90b 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -123,8 +123,7 @@ class RetryDiscipline(Discipline): # else enclose the discipline in ProcessPoolExecutor to run it. if math.isinf(self.timeout): return self.__discipline.execute(input_data) - else: - return self._execute_discipline(input_data) + return self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: LOGGER.debug("Try to re-launch the discipline as timeout is reached.") -- GitLab From d2861211c249f0cbede5760ba09c6f044ce7a238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Thu, 23 Jan 2025 09:30:19 +0000 Subject: [PATCH 48/58] docs: modified some comments and logger output --- .../disciplines/wrappers/retry_discipline.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index dc353da90b..ca55559c6d 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -85,7 +85,7 @@ class RetryDiscipline(Discipline): discipline: The discipline to wrap in the retry loop. n_retry: The number of retry of the discipline. wait_time: The time to wait between 2 retries (in seconds). - timeout : The maximum duration, in seconds, that the discipline is + timeout: The maximum duration, in seconds, that the discipline is allowed to run. If this time limit is exceeded, the execution is terminated. If ``math.inf``, the discipline is executed without timeout limit. @@ -115,7 +115,7 @@ class RetryDiscipline(Discipline): self.n_executions += 1 LOGGER.debug( - "Trying to run the discipline. Attempt %d/%d", n_try, self.n_retry + "Trying to execute the discipline: attempt %d/%d", n_try, self.n_retry ) try: @@ -126,7 +126,7 @@ class RetryDiscipline(Discipline): return self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: - LOGGER.debug("Try to re-launch the discipline as timeout is reached.") + LOGGER.debug("Timeout reached during the execution of discipline %s", self.__discipline.name) current_error = time_out_error except Exception as error: # noqa: BLE001 @@ -150,7 +150,7 @@ class RetryDiscipline(Discipline): raise current_error def _execute_discipline(self, input_data: StrKeyMapping) -> StrKeyMapping: - """Execute the discipline. + """Execute the discipline with a timeout. Args: input_data: The input data passed to the discipline. @@ -158,7 +158,7 @@ class RetryDiscipline(Discipline): Returns: The output returned by the discipline. """ - LOGGER.debug("time_out set to : %s s", self.timeout) + LOGGER.debug("Executing discipline %s with a timeout of %s s", self.__discipline.name, self.timeout) with ProcessPoolExecutor() as executor: run_discipline = executor.submit( @@ -167,16 +167,16 @@ class RetryDiscipline(Discipline): ) try: - LOGGER.debug("Running discipline and get the output") return run_discipline.result(timeout=self.timeout) + except (cfutures._base.TimeoutError, TimeoutError): # Runtime exceeds timeout, killing process and its children. LOGGER.debug("TimeoutError: Time out waiting for result()") - # abort discipline execution immediately: shutdown + kill children - # killing children is mandatory, else the program terminates only - # when timeout is reached + # Abort the discipline execution immediately: shutdown + kill children. + # Killing the children is mandatory, otherwise the program terminates only + # when timeout is reached. pid_child = [p.pid for p in executor._processes.values()] executor.shutdown(wait=False, cancel_futures=True) -- GitLab From b405af85cc6b13fe826d9f99646fd77a193f3ea8 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 23 Jan 2025 10:38:21 +0100 Subject: [PATCH 49/58] docs: removed and modify some comments and logger output --- .../disciplines/wrappers/retry_discipline.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index ca55559c6d..867d99597a 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -119,14 +119,15 @@ class RetryDiscipline(Discipline): ) try: - # If no timeout set, normal execution inside the "retry" loop - # else enclose the discipline in ProcessPoolExecutor to run it. if math.isinf(self.timeout): return self.__discipline.execute(input_data) return self._execute_discipline(input_data) except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: - LOGGER.debug("Timeout reached during the execution of discipline %s", self.__discipline.name) + LOGGER.debug( + "Timeout reached during the execution of discipline %s", + self.__discipline.name, + ) current_error = time_out_error except Exception as error: # noqa: BLE001 @@ -158,7 +159,11 @@ class RetryDiscipline(Discipline): Returns: The output returned by the discipline. """ - LOGGER.debug("Executing discipline %s with a timeout of %s s", self.__discipline.name, self.timeout) + LOGGER.debug( + "Executing discipline %s with a timeout of %s s", + self.__discipline.name, + self.timeout, + ) with ProcessPoolExecutor() as executor: run_discipline = executor.submit( @@ -171,12 +176,11 @@ class RetryDiscipline(Discipline): except (cfutures._base.TimeoutError, TimeoutError): # Runtime exceeds timeout, killing process and its children. - - LOGGER.debug("TimeoutError: Time out waiting for result()") - + # # Abort the discipline execution immediately: shutdown + kill children. - # Killing the children is mandatory, otherwise the program terminates only - # when timeout is reached. + # Killing the children is mandatory, otherwise the program terminates + # only when timeout is reached. + pid_child = [p.pid for p in executor._processes.values()] executor.shutdown(wait=False, cancel_futures=True) -- GitLab From 720400cd0f4e26e194ee01090cd727c7a500adac Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 23 Jan 2025 11:37:38 +0100 Subject: [PATCH 50/58] feat: add property n_executions to the class, and changed self.n_executions to private self.__n_executions --- src/gemseo/disciplines/wrappers/retry_discipline.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 867d99597a..07d7801fe8 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -56,7 +56,7 @@ class RetryDiscipline(Discipline): - a ``TimeoutError`` raised sooner than the ``timeout`` value set by the user. """ - n_executions: int + __n_executions: int """The number of performed executions of the discipline.""" n_retry: int @@ -106,13 +106,18 @@ class RetryDiscipline(Discipline): self.wait_time = wait_time self.timeout = timeout self.fatal_exceptions = fatal_exceptions - self.n_executions = 0 + self.__n_executions = 0 + + @property + def n_executions(self) -> int: + """The number of times the discipline has been retried during execution.""" + return self.__n_executions def _run(self, input_data: StrKeyMapping) -> StrKeyMapping | None: - self.n_executions = 0 + self.__n_executions = 0 for n_try in range(1, self.n_retry + 1): - self.n_executions += 1 + self.__n_executions += 1 LOGGER.debug( "Trying to execute the discipline: attempt %d/%d", n_try, self.n_retry -- GitLab From 389f86745f52a3fa5f38e3055f770de91b332d81 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 23 Jan 2025 14:37:28 +0100 Subject: [PATCH 51/58] refactor: apply suggested change during last review --- src/gemseo/disciplines/wrappers/retry_discipline.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 07d7801fe8..8777c627d3 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -59,6 +59,9 @@ class RetryDiscipline(Discipline): __n_executions: int """The number of performed executions of the discipline.""" + __time_out_errors = (TimeoutError, cfutures.TimeoutError) + """Tuple of possible timeout errors that can be raised during execution.""" + n_retry: int """The number of retry of the discipline.""" @@ -128,7 +131,7 @@ class RetryDiscipline(Discipline): return self.__discipline.execute(input_data) return self._execute_discipline(input_data) - except (TimeoutError, cfutures._base.TimeoutError) as time_out_error: + except self.__time_out_errors as time_out_error: LOGGER.debug( "Timeout reached during the execution of discipline %s", self.__discipline.name, @@ -179,13 +182,10 @@ class RetryDiscipline(Discipline): try: return run_discipline.result(timeout=self.timeout) - except (cfutures._base.TimeoutError, TimeoutError): - # Runtime exceeds timeout, killing process and its children. - # + except self.__time_out_errors: # Abort the discipline execution immediately: shutdown + kill children. # Killing the children is mandatory, otherwise the program terminates # only when timeout is reached. - pid_child = [p.pid for p in executor._processes.values()] executor.shutdown(wait=False, cancel_futures=True) -- GitLab From b4ef9448bcb14df8fadded3008080baed5e2a427 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Thu, 23 Jan 2025 15:33:53 +0100 Subject: [PATCH 52/58] refactor: renamed private variable and modified some comments --- .../disciplines/wrappers/retry_discipline.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 8777c627d3..9505b0002f 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -24,6 +24,7 @@ import time from concurrent.futures import ProcessPoolExecutor from logging import getLogger from typing import TYPE_CHECKING +from typing import ClassVar from gemseo.core.discipline import Discipline @@ -59,8 +60,11 @@ class RetryDiscipline(Discipline): __n_executions: int """The number of performed executions of the discipline.""" - __time_out_errors = (TimeoutError, cfutures.TimeoutError) - """Tuple of possible timeout errors that can be raised during execution.""" + __time_out_exceptions: ClassVar[tuple[type[Exception], ...]] = ( + TimeoutError, + cfutures.TimeoutError, + ) + """The possible timeout exceptions that can be raised during execution.""" n_retry: int """The number of retry of the discipline.""" @@ -131,7 +135,7 @@ class RetryDiscipline(Discipline): return self.__discipline.execute(input_data) return self._execute_discipline(input_data) - except self.__time_out_errors as time_out_error: + except self.__time_out_exceptions as time_out_error: LOGGER.debug( "Timeout reached during the execution of discipline %s", self.__discipline.name, @@ -182,10 +186,9 @@ class RetryDiscipline(Discipline): try: return run_discipline.result(timeout=self.timeout) - except self.__time_out_errors: - # Abort the discipline execution immediately: shutdown + kill children. - # Killing the children is mandatory, otherwise the program terminates - # only when timeout is reached. + except self.__time_out_exceptions: + # Killing the children is mandatory to abort the discipline execution + # immediately: shutdown + kill children. pid_child = [p.pid for p in executor._processes.values()] executor.shutdown(wait=False, cancel_futures=True) -- GitLab From aa4e73012c5f3d4cdf4d6d3ee29c77bca9a16407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Fran=C3=A7ois=20Figu=C3=A9?= Date: Fri, 24 Jan 2025 09:45:08 +0000 Subject: [PATCH 53/58] refactor: removed useless lines --- .../wrappers/test_retry_discipline.py | 29 +------------------ 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index b449a8385f..2aae580d95 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -12,11 +12,6 @@ # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -# Contributors: -# INITIAL AUTHORS - initial API and implementation and/or -# initial documentation -# :author: Francois Gallard -# OTHER AUTHORS - MACROSCOPIC CHANGES """Tests for retry discipline.""" from __future__ import annotations @@ -38,7 +33,6 @@ if TYPE_CHECKING: from gemseo.typing import StrKeyMapping -LOGGER = getLogger(__name__) @pytest.fixture @@ -66,26 +60,14 @@ def a_long_time_running_discipline() -> Discipline: class CrashingDisciplineInRun(Discipline): """A discipline raising NotImplementedError in ``_run``.""" - def __init__(self, name: str = "", a_string=None) -> None: - """A discipline raising NotImplementedError in _run.""" - super().__init__(name=name) - self.a_string = a_string - def _run(self, input_data: StrKeyMapping): - """The run method.""" raise NotImplementedError class DisciplineLongTimeRunning(Discipline): """A discipline that could run for a while, to test the timeout feature.""" - def __init__(self, name: str = "", a_string=None) -> None: - """A discipline that could run for a while.""" - super().__init__(name=name) - self.a_string = a_string - def _run(self, input_data: StrKeyMapping) -> None: - """The run method.""" x = 0.0 for i in range(300): for j in range(1000): @@ -111,7 +93,7 @@ def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: assert disc_with_timeout.n_executions == 1 -@pytest.mark.parametrize("n_retry", [1, 2, 3]) +@pytest.mark.parametrize("n_retry", [1, 3]) def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) -> None: """Test failure of the discipline with a too much very short timeout.""" disc_with_timeout = RetryDiscipline( @@ -120,7 +102,6 @@ def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute({"x": array([4.0])}) - assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry @@ -133,8 +114,6 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: with pytest.raises(ZeroDivisionError) as err: disc.execute({"x": array([0.0])}) - assert err.type is ZeroDivisionError - assert err.typename == "ZeroDivisionError" assert str(err.value) == "float division by zero" @@ -165,9 +144,6 @@ def test_failure_zero_division_error_with_timeout( disc.execute({"x": array([0.0])}) assert disc.n_executions == 1 - assert err.type is ZeroDivisionError - assert err.typename == "ZeroDivisionError" - assert str(err.value) == "float division by zero" def test_a_not_implemented_error_analytic_discipline( @@ -192,8 +168,6 @@ def test_a_not_implemented_error_analytic_discipline( retry_discipline.execute({"x": array([1.0])}) assert retry_discipline.n_executions == 1 - assert err.type is NotImplementedError - assert err.typename == "NotImplementedError" def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> None: @@ -206,6 +180,5 @@ def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> Non with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: disc_with_timeout.execute({"x": array([0.0])}) - assert timeout_exec.typename == "TimeoutError" assert disc_with_timeout.n_executions == n_retry assert disc_with_timeout.local_data == {} -- GitLab From 518abae751bb28ea3756aadab13bbb57bfc9a23e Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 24 Jan 2025 15:32:12 +0100 Subject: [PATCH 54/58] refactor: modify to only use TimeoutError exception when a timeout is reached. Modify some pytest.raise and assert lines --- .../disciplines/wrappers/retry_discipline.py | 4 +- .../wrappers/test_retry_discipline.py | 44 +++++++------------ 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 9505b0002f..02a8b338c9 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -135,12 +135,12 @@ class RetryDiscipline(Discipline): return self.__discipline.execute(input_data) return self._execute_discipline(input_data) - except self.__time_out_exceptions as time_out_error: + except self.__time_out_exceptions: LOGGER.debug( "Timeout reached during the execution of discipline %s", self.__discipline.name, ) - current_error = time_out_error + current_error = TimeoutError except Exception as error: # noqa: BLE001 if isinstance(error, self.fatal_exceptions): diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 2aae580d95..74be329fdf 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -16,9 +16,8 @@ from __future__ import annotations -import concurrent.futures as cfutures import math -from logging import getLogger +import time from typing import TYPE_CHECKING import pytest @@ -34,7 +33,6 @@ if TYPE_CHECKING: from gemseo.typing import StrKeyMapping - @pytest.fixture def an_analytic_discipline() -> Discipline: """Analytic discipline.""" @@ -49,7 +47,7 @@ def a_crashing_analytic_discipline() -> Discipline: @pytest.fixture def a_crashing_discipline_in_run() -> Discipline: - return CrashingDisciplineInRun(name="Crash_run", a_string="Hello") + return CrashingDisciplineInRun(name="Crash_run") @pytest.fixture @@ -68,29 +66,17 @@ class DisciplineLongTimeRunning(Discipline): """A discipline that could run for a while, to test the timeout feature.""" def _run(self, input_data: StrKeyMapping) -> None: - x = 0.0 - for i in range(300): - for j in range(1000): - for _k in range(1000): - x = x + math.cos(i) * math.sin(j) + time.sleep(300.0) -def test_retry_discipline(an_analytic_discipline) -> None: +@pytest.mark.parametrize("timeout", [math.inf, 10.0]) +def test_retry_discipline(an_analytic_discipline, timeout) -> None: """Test discipline, no timeout set.""" - retry_discipline = RetryDiscipline(an_analytic_discipline) + retry_discipline = RetryDiscipline(an_analytic_discipline, timeout=timeout) retry_discipline.execute({"x": array([4.0])}) - assert retry_discipline.local_data == {"x": array([4.0]), "y": array([9.0])} assert retry_discipline.n_executions == 1 - - -def test_retry_discipline_with_timeout(an_analytic_discipline) -> None: - """Test discipline with a timeout.""" - disc_with_timeout = RetryDiscipline(an_analytic_discipline, timeout=10) - disc_with_timeout.execute({"x": array([4.0])}) - - assert disc_with_timeout.local_data == {"x": array([4.0]), "y": array([9.0])} - assert disc_with_timeout.n_executions == 1 + assert retry_discipline.local_data == {"x": array([4.0]), "y": array([9.0])} @pytest.mark.parametrize("n_retry", [1, 3]) @@ -99,10 +85,11 @@ def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) disc_with_timeout = RetryDiscipline( an_analytic_discipline, timeout=1e-4, n_retry=n_retry ) - with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: + with pytest.raises(TimeoutError, match=""): disc_with_timeout.execute({"x": array([4.0])}) assert disc_with_timeout.n_executions == n_retry + assert disc_with_timeout.local_data == {"x": array([4.0])} def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: @@ -111,10 +98,11 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: In order to catch the ZeroDivisionError, set n_retry=1 """ disc = RetryDiscipline(a_crashing_analytic_discipline, n_retry=1) - with pytest.raises(ZeroDivisionError) as err: + with pytest.raises(ZeroDivisionError, match="") as err: disc.execute({"x": array([0.0])}) assert str(err.value) == "float division by zero" + assert disc.local_data == {"x": array([0.0])} @pytest.mark.parametrize( @@ -140,10 +128,11 @@ def test_failure_zero_division_error_with_timeout( timeout=5.0, fatal_exceptions=tuple_errors, ) - with pytest.raises(ZeroDivisionError) as err: + with pytest.raises(ZeroDivisionError): disc.execute({"x": array([0.0])}) assert disc.n_executions == 1 + assert disc.local_data == {"x": array([0.0])} def test_a_not_implemented_error_analytic_discipline( @@ -164,10 +153,11 @@ def test_a_not_implemented_error_analytic_discipline( NotImplementedError, ), ) - with pytest.raises(NotImplementedError) as err: + with pytest.raises(NotImplementedError): retry_discipline.execute({"x": array([1.0])}) assert retry_discipline.n_executions == 1 + assert retry_discipline.local_data == {} def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> None: @@ -175,9 +165,9 @@ def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> Non n_retry = 1 disc_with_timeout = RetryDiscipline( - a_long_time_running_discipline, timeout=3.0, n_retry=n_retry + a_long_time_running_discipline, timeout=2.0, n_retry=n_retry ) - with pytest.raises(cfutures._base.TimeoutError) as timeout_exec: + with pytest.raises(TimeoutError): disc_with_timeout.execute({"x": array([0.0])}) assert disc_with_timeout.n_executions == n_retry -- GitLab From 47074ffda6d121cfa04360ecf4efa562862f829e Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 28 Jan 2025 14:35:30 +0100 Subject: [PATCH 55/58] tests: modify analytical discipline + some remarks from last review --- .../disciplines/wrappers/retry_discipline.py | 2 +- .../wrappers/test_retry_discipline.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index 02a8b338c9..ff165b000b 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -60,7 +60,7 @@ class RetryDiscipline(Discipline): __n_executions: int """The number of performed executions of the discipline.""" - __time_out_exceptions: ClassVar[tuple[type[Exception], ...]] = ( + __time_out_exceptions: ClassVar[set[type[Exception], ...]] = ( TimeoutError, cfutures.TimeoutError, ) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 74be329fdf..b1e3dccc06 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -36,13 +36,13 @@ if TYPE_CHECKING: @pytest.fixture def an_analytic_discipline() -> Discipline: """Analytic discipline.""" - return create_discipline("AnalyticDiscipline", expressions={"y": "2*x+1"}) + return create_discipline("AnalyticDiscipline", expressions={"y": "x"}) @pytest.fixture def a_crashing_analytic_discipline() -> Discipline: """Analytic discipline crashing when x=0.""" - return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x+1"}) + return create_discipline("AnalyticDiscipline", expressions={"y": "1.0/x"}) @pytest.fixture @@ -66,7 +66,7 @@ class DisciplineLongTimeRunning(Discipline): """A discipline that could run for a while, to test the timeout feature.""" def _run(self, input_data: StrKeyMapping) -> None: - time.sleep(300.0) + time.sleep(5.0) @pytest.mark.parametrize("timeout", [math.inf, 10.0]) @@ -76,7 +76,7 @@ def test_retry_discipline(an_analytic_discipline, timeout) -> None: retry_discipline.execute({"x": array([4.0])}) assert retry_discipline.n_executions == 1 - assert retry_discipline.local_data == {"x": array([4.0]), "y": array([9.0])} + assert retry_discipline.local_data == {"x": array([4.0]), "y": array([4.0])} @pytest.mark.parametrize("n_retry", [1, 3]) @@ -106,7 +106,7 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: @pytest.mark.parametrize( - "tuple_errors", + "fatal_exceptions", [ (ZeroDivisionError,), (ZeroDivisionError, FloatingPointError, OverflowError), @@ -115,7 +115,9 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: ) @pytest.mark.parametrize("n_try", [1, 3]) def test_failure_zero_division_error_with_timeout( - n_try: int, tuple_errors: Iterable[type[Exception]], a_crashing_analytic_discipline + n_try: int, + fatal_exceptions: Iterable[type[Exception]], + a_crashing_analytic_discipline, ) -> None: """Test failure of the discipline with timeout and a bad x entry. @@ -126,7 +128,7 @@ def test_failure_zero_division_error_with_timeout( a_crashing_analytic_discipline, n_retry=n_try, timeout=5.0, - fatal_exceptions=tuple_errors, + fatal_exceptions=fatal_exceptions, ) with pytest.raises(ZeroDivisionError): disc.execute({"x": array([0.0])}) -- GitLab From 0d68b7dee618701b9836e85d529155082d4af305 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Tue, 28 Jan 2025 17:10:43 +0100 Subject: [PATCH 56/58] test: add the check of n_executions value in one test --- tests/disciplines/wrappers/test_retry_discipline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index b1e3dccc06..051faa220d 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -103,6 +103,7 @@ def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: assert str(err.value) == "float division by zero" assert disc.local_data == {"x": array([0.0])} + assert disc.n_executions == 1 @pytest.mark.parametrize( -- GitLab From 6b390f8c2b3300e42263ed318bf7c3996f2381d3 Mon Sep 17 00:00:00 2001 From: "jean-francois.figue" Date: Fri, 31 Jan 2025 16:43:21 +0100 Subject: [PATCH 57/58] refactor: modify tests by adding more checks. Modify message raises when Timeout error occurs --- .../disciplines/wrappers/retry_discipline.py | 15 ++-- .../wrappers/test_retry_discipline.py | 82 ++++++++++++++++--- 2 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/gemseo/disciplines/wrappers/retry_discipline.py b/src/gemseo/disciplines/wrappers/retry_discipline.py index ff165b000b..d7d617713d 100644 --- a/src/gemseo/disciplines/wrappers/retry_discipline.py +++ b/src/gemseo/disciplines/wrappers/retry_discipline.py @@ -60,7 +60,7 @@ class RetryDiscipline(Discipline): __n_executions: int """The number of performed executions of the discipline.""" - __time_out_exceptions: ClassVar[set[type[Exception], ...]] = ( + __time_out_exceptions: ClassVar[tuple[type[Exception], ...]] = ( TimeoutError, cfutures.TimeoutError, ) @@ -136,11 +136,12 @@ class RetryDiscipline(Discipline): return self._execute_discipline(input_data) except self.__time_out_exceptions: - LOGGER.debug( - "Timeout reached during the execution of discipline %s", - self.__discipline.name, + msg = ( + "Timeout reached during the execution of " + f"discipline {self.__discipline.name}" ) - current_error = TimeoutError + LOGGER.debug(msg) + current_error = TimeoutError(msg) except Exception as error: # noqa: BLE001 if isinstance(error, self.fatal_exceptions): @@ -155,10 +156,12 @@ class RetryDiscipline(Discipline): time.sleep(self.wait_time) + plural_suffix = "s" if self.n_retry > 1 else "" LOGGER.error( - "Failed to execute discipline %s after %d attempts.", + "Failed to execute discipline %s after %d attempt%s.", self.__discipline.name, self.n_retry, + plural_suffix, ) raise current_error diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 051faa220d..363f1c0ba1 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -26,6 +26,7 @@ from numpy import array from gemseo import create_discipline from gemseo.core.discipline import Discipline from gemseo.disciplines.wrappers.retry_discipline import RetryDiscipline +from gemseo.utils.timer import Timer if TYPE_CHECKING: from collections.abc import Iterable @@ -59,7 +60,8 @@ class CrashingDisciplineInRun(Discipline): """A discipline raising NotImplementedError in ``_run``.""" def _run(self, input_data: StrKeyMapping): - raise NotImplementedError + msg = "Error: This method is not implemented." + raise NotImplementedError(msg) class DisciplineLongTimeRunning(Discipline): @@ -70,7 +72,7 @@ class DisciplineLongTimeRunning(Discipline): @pytest.mark.parametrize("timeout", [math.inf, 10.0]) -def test_retry_discipline(an_analytic_discipline, timeout) -> None: +def test_retry_discipline(an_analytic_discipline, timeout, caplog) -> None: """Test discipline, no timeout set.""" retry_discipline = RetryDiscipline(an_analytic_discipline, timeout=timeout) retry_discipline.execute({"x": array([4.0])}) @@ -78,33 +80,60 @@ def test_retry_discipline(an_analytic_discipline, timeout) -> None: assert retry_discipline.n_executions == 1 assert retry_discipline.local_data == {"x": array([4.0]), "y": array([4.0])} + assert caplog.text == "" + +@pytest.mark.parametrize("wait_time", [0.5, 1.0]) @pytest.mark.parametrize("n_retry", [1, 3]) -def test_failure_retry_discipline_with_timeout(an_analytic_discipline, n_retry) -> None: +def test_failure_retry_discipline_with_timeout( + an_analytic_discipline, n_retry, wait_time, caplog +) -> None: """Test failure of the discipline with a too much very short timeout.""" disc_with_timeout = RetryDiscipline( - an_analytic_discipline, timeout=1e-4, n_retry=n_retry + an_analytic_discipline, timeout=1e-4, n_retry=n_retry, wait_time=wait_time ) - with pytest.raises(TimeoutError, match=""): + + with ( + Timer() as timer, + pytest.raises( + TimeoutError, + match="Timeout reached during the execution" + " of discipline AnalyticDiscipline", + ), + ): disc_with_timeout.execute({"x": array([4.0])}) + elapsed_time = timer.elapsed_time + assert elapsed_time > 0.05 + (n_retry - 1) * wait_time + assert disc_with_timeout.n_executions == n_retry assert disc_with_timeout.local_data == {"x": array([4.0])} + assert "Process stopped as it exceeds timeout" in caplog.text + + plural_suffix = "s" if n_retry > 1 else "" + log_message = ( + f"Failed to execute discipline AnalyticDiscipline after {n_retry}" + f" attempt{plural_suffix}." + ) + assert log_message in caplog.text + -def test_failure_zero_division_error(a_crashing_analytic_discipline) -> None: +def test_failure_zero_division_error(a_crashing_analytic_discipline, caplog) -> None: """Test failure of the discipline with a bad x entry. In order to catch the ZeroDivisionError, set n_retry=1 """ disc = RetryDiscipline(a_crashing_analytic_discipline, n_retry=1) - with pytest.raises(ZeroDivisionError, match="") as err: + with pytest.raises(ZeroDivisionError, match="float division by zero"): disc.execute({"x": array([0.0])}) - assert str(err.value) == "float division by zero" assert disc.local_data == {"x": array([0.0])} assert disc.n_executions == 1 + log_message = "Failed to execute discipline AnalyticDiscipline after 1 attempt." + assert log_message in caplog.text + @pytest.mark.parametrize( "fatal_exceptions", @@ -119,6 +148,7 @@ def test_failure_zero_division_error_with_timeout( n_try: int, fatal_exceptions: Iterable[type[Exception]], a_crashing_analytic_discipline, + caplog, ) -> None: """Test failure of the discipline with timeout and a bad x entry. @@ -131,15 +161,21 @@ def test_failure_zero_division_error_with_timeout( timeout=5.0, fatal_exceptions=fatal_exceptions, ) - with pytest.raises(ZeroDivisionError): + with pytest.raises(ZeroDivisionError, match="float division by zero"): disc.execute({"x": array([0.0])}) assert disc.n_executions == 1 assert disc.local_data == {"x": array([0.0])} + log_message = ( + "Failed to execute discipline AnalyticDiscipline," + " aborting retry because of the exception type ." + ) + assert log_message in caplog.text + def test_a_not_implemented_error_analytic_discipline( - a_crashing_discipline_in_run, + a_crashing_discipline_in_run, caplog ) -> None: """Test discipline with a_crashing_discipline_in_run and a tuple of @@ -156,22 +192,42 @@ def test_a_not_implemented_error_analytic_discipline( NotImplementedError, ), ) - with pytest.raises(NotImplementedError): + with pytest.raises( + NotImplementedError, match="Error: This method is not implemented." + ): retry_discipline.execute({"x": array([1.0])}) assert retry_discipline.n_executions == 1 assert retry_discipline.local_data == {} + log_message = ( + "Failed to execute discipline Crash_run, aborting retry " + "because of the exception type ." + ) + assert log_message in caplog.text + -def test_retry_discipline_timeout_feature(a_long_time_running_discipline) -> None: +def test_retry_discipline_timeout_feature( + a_long_time_running_discipline, caplog +) -> None: """Test the timeout feature of discipline with a long computation.""" n_retry = 1 disc_with_timeout = RetryDiscipline( a_long_time_running_discipline, timeout=2.0, n_retry=n_retry ) - with pytest.raises(TimeoutError): + with pytest.raises( + TimeoutError, + match="Timeout reached during the execution" + " of discipline DisciplineLongTimeRunning", + ): disc_with_timeout.execute({"x": array([0.0])}) assert disc_with_timeout.n_executions == n_retry assert disc_with_timeout.local_data == {} + + assert "Process stopped as it exceeds timeout" in caplog.text + log_message = ( + "Failed to execute discipline DisciplineLongTimeRunning after 1 attempt." + ) + assert log_message in caplog.text -- GitLab From b706b918d5a3cb1bb4f1d3657aaed2348103c607 Mon Sep 17 00:00:00 2001 From: Antoine Dechaume <7801279-AntoineD@users.noreply.gitlab.com> Date: Fri, 31 Jan 2025 19:52:04 +0000 Subject: [PATCH 58/58] fix: ruff --- tests/disciplines/wrappers/test_retry_discipline.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/disciplines/wrappers/test_retry_discipline.py b/tests/disciplines/wrappers/test_retry_discipline.py index 363f1c0ba1..5262ce0c1d 100644 --- a/tests/disciplines/wrappers/test_retry_discipline.py +++ b/tests/disciplines/wrappers/test_retry_discipline.py @@ -17,6 +17,7 @@ from __future__ import annotations import math +import re import time from typing import TYPE_CHECKING @@ -193,7 +194,7 @@ def test_a_not_implemented_error_analytic_discipline( ), ) with pytest.raises( - NotImplementedError, match="Error: This method is not implemented." + NotImplementedError, match=re.escape("Error: This method is not implemented.") ): retry_discipline.execute({"x": array([1.0])}) -- GitLab