diff --git a/src/gemseo/caches/hdf5_cache.py b/src/gemseo/caches/hdf5_cache.py index 4bda080d27a4fd4b299a27cae302da71f75c7411..bcc46543e4af7896fa01bae4237beb274a64c759 100644 --- a/src/gemseo/caches/hdf5_cache.py +++ b/src/gemseo/caches/hdf5_cache.py @@ -22,6 +22,7 @@ from __future__ import annotations import logging from multiprocessing import RLock +from multiprocessing.sharedctypes import Synchronized from pathlib import Path from typing import Any from typing import Generator @@ -43,6 +44,9 @@ LOGGER = logging.getLogger(__name__) class HDF5Cache(AbstractFullCache): """Cache using disk HDF5 file to store the data.""" + _ATTR_NOT_TO_SERIALIZE: tuple[str] = ("lock_hashes", "lock") + """The attributes that shall be skipped at serialization.""" + def __init__( self, hdf_file_path: str | Path = "cache.hdf5", @@ -97,17 +101,32 @@ class HDF5Cache(AbstractFullCache): msg.add("HDF node name: {}", self.__hdf_node_name) return str(msg) - def __getstate__(self): - # Pickle __init__ arguments so to call it when unpickling. - return dict( - tolerance=self.tolerance, - hdf_file_path=self.__hdf_file.hdf_file_path, - hdf_node_path=self.__hdf_node_name, - name=self.name, - ) + def __getstate__(self) -> dict[str, Any]: + """Used by pickle to define what to serialize. - def __setstate__(self, state): - self.__init__(**state) + Returns: + The attributes to be serialized. + """ + state = {} + for attribute_name in list(self.__dict__.keys() - self._ATTR_NOT_TO_SERIALIZE): + attribute_value = self.__dict__[attribute_name] + if isinstance(attribute_value, Synchronized): + # Don´t serialize shared memory object, + # this is meaningless, save the value instead + attribute_value = attribute_value.value + state[attribute_name] = attribute_value + state["lock_hashes"] = None # Cannot recreate RLock so I set it to None + state["lock"] = None # to see where the code goes. + + return state + + def __setstate__(self, state: dict[str, Any]) -> None: + for attribute_name, attribute_value in state.items(): + if isinstance(attribute_value, Synchronized): + # Don´t serialize shared memory object, + # this is meaningless, save the value instead + attribute_value = attribute_value.value + self.__dict__[attribute_name] = attribute_value def _copy_empty_cache(self) -> HDF5Cache: file_path = Path(self.__hdf_file.hdf_file_path) diff --git a/src/gemseo/caches/hdf5_file_singleton.py b/src/gemseo/caches/hdf5_file_singleton.py index 0d8ae09aa3e9fe8ff57d136fd9bf14b169fbb99e..a503e4531dfa14216cba58802e5aa3ff89da920a 100644 --- a/src/gemseo/caches/hdf5_file_singleton.py +++ b/src/gemseo/caches/hdf5_file_singleton.py @@ -23,6 +23,7 @@ from __future__ import annotations from genericpath import exists from multiprocessing import RLock from pathlib import Path +from typing import Any from typing import ClassVar import h5py @@ -65,6 +66,9 @@ class HDF5FileSingleton(metaclass=SingleInstancePerFileAttribute): _INPUTS_GROUP: ClassVar[str] = AbstractFullCache._INPUTS_GROUP """The label for the input variables.""" + _ATTR_NOT_TO_SERIALIZE: tuple[str] = ("lock",) + """The attributes that shall be skipped at serialization.""" + def __init__( self, hdf_file_path: str, @@ -362,3 +366,39 @@ class HDF5FileSingleton(metaclass=SingleInstancePerFileAttribute): data = {key: array(val) for key, val in data.items()} data_hash = array([hash_data_dict(data)], dtype="bytes") sample_value[cls.HASH_TAG][0] = data_hash + + def __getstate__(self) -> dict[str, Any]: + """Used by pickle to define what to serialize. + + Returns: + The attributes to be serialized. + """ + state = {} + for attribute_name in list(self.__dict__.keys() - self._ATTR_NOT_TO_SERIALIZE): + attribute_value = self.__dict__[attribute_name] + + # At this point, there are no Synchronized attributes in HDF5FileSingleton. + # If a Synchronized attribute is added in the future, the following check + # (and its counterpart in __setstate__) shall be uncommented. + + # if isinstance(attribute_value, Synchronized): + # # Don´t serialize shared memory object, + # # this is meaningless, save the value instead + # attribute_value = attribute_value.value + state[attribute_name] = attribute_value + state["lock"] = None # I cannot recreate the RLock() + + return state + + def __setstate__(self, state: dict[str, Any]) -> None: + for attribute_name, attribute_value in state.items(): + + # At this point, there are no Synchronized attributes in HDF5FileSingleton. + # If a Synchronized attribute is added in the future, the following check + # (and its counterpart in __setstate__) shall be uncommented. + + # if isinstance(attribute_value, Synchronized): + # # Don´t serialize shared memory object, + # # this is meaningless, save the value instead + # attribute_value = attribute_value.value + self.__dict__[attribute_name] = attribute_value