This commit is contained in:
Clarence Ng 2022-06-24 17:56:51 -07:00
parent f53a7f115b
commit 3c7f8154e9
3 changed files with 155 additions and 86 deletions

View file

@ -10,14 +10,15 @@ from ray.cloudpickle.cloudpickle_fast import CloudPickler, dump, dumps # noqa
# expose their Pickler subclass at top-level under the "Pickler" name.
Pickler = CloudPickler
__version__ = '2.0.0'
__version__ = "2.0.0"
def _warn_msg(obj, method, exc):
return (
f"{method}({str(obj)}) failed."
"\nTo check which non-serializable variables are captured "
"in scope, re-run the ray script with 'RAY_PICKLE_VERBOSE_DEBUG=1'.")
"in scope, re-run the ray script with 'RAY_PICKLE_VERBOSE_DEBUG=1'."
)
def dump_debug(obj, *args, **kwargs):
@ -26,6 +27,7 @@ def dump_debug(obj, *args, **kwargs):
except (TypeError, PicklingError) as exc:
if os.environ.get("RAY_PICKLE_VERBOSE_DEBUG"):
from ray.util.check_serialize import inspect_serializability
inspect_serializability(obj)
raise
else:
@ -39,6 +41,7 @@ def dumps_debug(obj, *args, **kwargs):
except (TypeError, PicklingError) as exc:
if os.environ.get("RAY_PICKLE_VERBOSE_DEBUG"):
from ray.util.check_serialize import inspect_serializability
inspect_serializability(obj)
raise
else:

View file

@ -970,13 +970,13 @@ def _typevar_reduce(obj):
def _get_bases(typ):
if '__orig_bases__' in getattr(typ, '__dict__', {}):
if "__orig_bases__" in getattr(typ, "__dict__", {}):
# For generic types (see PEP 560)
# Note that simply checking `hasattr(typ, '__orig_bases__')` is not
# correct. Subclasses of a fully-parameterized generic class does not
# have `__orig_bases__` defined, but `hasattr(typ, '__orig_bases__')`
# will return True because it's defined in the base class.
bases_attr = '__orig_bases__'
bases_attr = "__orig_bases__"
else:
# For regular class objects
bases_attr = "__bases__"

View file

@ -69,9 +69,7 @@ if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
compatibility with older versions of Python.
"""
CloudPickler(
file, protocol=protocol, buffer_callback=buffer_callback
).dump(obj)
CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj)
def dumps(obj, protocol=None, buffer_callback=None):
"""Serialize obj as a string of bytes allocated in memory
@ -84,9 +82,7 @@ if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY:
compatibility with older versions of Python.
"""
with io.BytesIO() as file:
cp = CloudPickler(
file, protocol=protocol, buffer_callback=buffer_callback
)
cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback)
cp.dump(obj)
return file.getvalue()
@ -126,23 +122,37 @@ load, loads = pickle.load, pickle.loads
# COLLECTION OF OBJECTS __getnewargs__-LIKE METHODS
# -------------------------------------------------
def _class_getnewargs(obj):
type_kwargs = {}
if "__slots__" in obj.__dict__:
type_kwargs["__slots__"] = obj.__slots__
__dict__ = obj.__dict__.get('__dict__', None)
__dict__ = obj.__dict__.get("__dict__", None)
if isinstance(__dict__, property):
type_kwargs['__dict__'] = __dict__
type_kwargs["__dict__"] = __dict__
return (type(obj), obj.__name__, _get_bases(obj), type_kwargs,
_get_or_create_tracker_id(obj), None)
return (
type(obj),
obj.__name__,
_get_bases(obj),
type_kwargs,
_get_or_create_tracker_id(obj),
None,
)
def _enum_getnewargs(obj):
members = dict((e.name, e.value) for e in obj)
return (obj.__bases__, obj.__name__, obj.__qualname__, members,
obj.__module__, _get_or_create_tracker_id(obj), None)
return (
obj.__bases__,
obj.__name__,
obj.__qualname__,
members,
obj.__module__,
_get_or_create_tracker_id(obj),
None,
)
# COLLECTION OF OBJECTS RECONSTRUCTORS
@ -172,12 +182,12 @@ def _function_getstate(func):
}
f_globals_ref = _extract_code_globals(func.__code__)
f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in
func.__globals__}
f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in func.__globals__}
closure_values = (
list(map(_get_cell_contents, func.__closure__))
if func.__closure__ is not None else ()
if func.__closure__ is not None
else ()
)
# Extract currently-imported submodules used by func. Storing these modules
@ -185,7 +195,8 @@ def _function_getstate(func):
# trigger the side effect of importing these modules at unpickling time
# (which is necessary for func to work correctly once depickled)
slotstate["_cloudpickle_submodules"] = _find_imported_submodules(
func.__code__, itertools.chain(f_globals.values(), closure_values))
func.__code__, itertools.chain(f_globals.values(), closure_values)
)
slotstate["__globals__"] = f_globals
state = func.__dict__
@ -194,28 +205,27 @@ def _function_getstate(func):
def _class_getstate(obj):
clsdict = _extract_class_dict(obj)
clsdict.pop('__weakref__', None)
clsdict.pop("__weakref__", None)
if issubclass(type(obj), abc.ABCMeta):
# If obj is an instance of an ABCMeta subclass, don't pickle the
# cache/negative caches populated during isinstance/issubclass
# checks, but pickle the list of registered subclasses of obj.
clsdict.pop('_abc_cache', None)
clsdict.pop('_abc_negative_cache', None)
clsdict.pop('_abc_negative_cache_version', None)
clsdict.pop("_abc_cache", None)
clsdict.pop("_abc_negative_cache", None)
clsdict.pop("_abc_negative_cache_version", None)
# these are generated by some thirdparty libraries
clsdict.pop('_abc_generic_negative_cache', None)
clsdict.pop('_abc_generic_negative_cache_version', None)
clsdict.pop("_abc_generic_negative_cache", None)
clsdict.pop("_abc_generic_negative_cache_version", None)
registry = clsdict.pop('_abc_registry', None)
registry = clsdict.pop("_abc_registry", None)
if registry is None:
# in Python3.7+, the abc caches and registered subclasses of a
# class are bundled into the single _abc_impl attribute
clsdict.pop('_abc_impl', None)
clsdict.pop("_abc_impl", None)
(registry, _, _, _) = abc._get_dump(obj)
clsdict["_abc_impl"] = [subclass_weakref()
for subclass_weakref in registry]
clsdict["_abc_impl"] = [subclass_weakref() for subclass_weakref in registry]
else:
# In the above if clause, registry is a set of weakrefs -- in
# this case, registry is a WeakSet
@ -231,7 +241,7 @@ def _class_getstate(obj):
for k in obj.__slots__:
clsdict.pop(k, None)
clsdict.pop('__dict__', None) # unpicklable property object
clsdict.pop("__dict__", None) # unpicklable property object
return (clsdict, {})
@ -242,9 +252,13 @@ def _enum_getstate(obj):
members = dict((e.name, e.value) for e in obj)
# Cleanup the clsdict that will be passed to _rehydrate_skeleton_class:
# Those attributes are already handled by the metaclass.
for attrname in ["_generate_next_value_", "_member_names_",
"_member_map_", "_member_type_",
"_value2member_map_"]:
for attrname in [
"_generate_next_value_",
"_member_names_",
"_member_map_",
"_member_type_",
"_value2member_map_",
]:
clsdict.pop(attrname, None)
for member in members:
clsdict.pop(member)
@ -263,37 +277,68 @@ def _enum_getstate(obj):
# obj.__reduce__), some do not. The following methods were created to "fill
# these holes".
def _code_reduce(obj):
"""codeobject reducer"""
if hasattr(obj, "co_linetable"): # pragma: no branch
# Python 3.10 and later: obj.co_lnotab is deprecated and constructor
# expects obj.co_linetable instead.
args = (
obj.co_argcount, obj.co_posonlyargcount,
obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
obj.co_varnames, obj.co_filename, obj.co_name,
obj.co_firstlineno, obj.co_linetable, obj.co_freevars,
obj.co_cellvars
obj.co_argcount,
obj.co_posonlyargcount,
obj.co_kwonlyargcount,
obj.co_nlocals,
obj.co_stacksize,
obj.co_flags,
obj.co_code,
obj.co_consts,
obj.co_names,
obj.co_varnames,
obj.co_filename,
obj.co_name,
obj.co_firstlineno,
obj.co_linetable,
obj.co_freevars,
obj.co_cellvars,
)
elif hasattr(obj, "co_posonlyargcount"):
# Backward compat for 3.9 and older
args = (
obj.co_argcount, obj.co_posonlyargcount,
obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize,
obj.co_flags, obj.co_code, obj.co_consts, obj.co_names,
obj.co_varnames, obj.co_filename, obj.co_name,
obj.co_firstlineno, obj.co_lnotab, obj.co_freevars,
obj.co_cellvars
obj.co_argcount,
obj.co_posonlyargcount,
obj.co_kwonlyargcount,
obj.co_nlocals,
obj.co_stacksize,
obj.co_flags,
obj.co_code,
obj.co_consts,
obj.co_names,
obj.co_varnames,
obj.co_filename,
obj.co_name,
obj.co_firstlineno,
obj.co_lnotab,
obj.co_freevars,
obj.co_cellvars,
)
else:
# Backward compat for even older versions of Python
args = (
obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals,
obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts,
obj.co_names, obj.co_varnames, obj.co_filename,
obj.co_name, obj.co_firstlineno, obj.co_lnotab,
obj.co_freevars, obj.co_cellvars
obj.co_argcount,
obj.co_kwonlyargcount,
obj.co_nlocals,
obj.co_stacksize,
obj.co_flags,
obj.co_code,
obj.co_consts,
obj.co_names,
obj.co_varnames,
obj.co_filename,
obj.co_name,
obj.co_firstlineno,
obj.co_lnotab,
obj.co_freevars,
obj.co_cellvars,
)
return types.CodeType, args
@ -305,7 +350,7 @@ def _cell_reduce(obj):
except ValueError: # cell is empty
return _make_empty_cell, ()
else:
return _make_cell, (obj.cell_contents, )
return _make_cell, (obj.cell_contents,)
def _classmethod_reduce(obj):
@ -330,13 +375,10 @@ def _file_reduce(obj):
if obj.closed:
raise pickle.PicklingError("Cannot pickle closed files")
if hasattr(obj, "isatty") and obj.isatty():
raise pickle.PicklingError(
"Cannot pickle files that map to tty objects"
)
raise pickle.PicklingError("Cannot pickle files that map to tty objects")
if "r" not in obj.mode and "+" not in obj.mode:
raise pickle.PicklingError(
"Cannot pickle files that are not opened for reading: %s"
% obj.mode
"Cannot pickle files that are not opened for reading: %s" % obj.mode
)
name = obj.name
@ -381,7 +423,7 @@ def _module_reduce(obj):
# reason, we do not attempt to pickle the "__builtins__" entry, and
# restore a default value for it at unpickling time.
state = obj.__dict__.copy()
state.pop('__builtins__', None)
state.pop("__builtins__", None)
return dynamic_subimport, (obj.__name__, state)
@ -415,13 +457,21 @@ def _dynamic_class_reduce(obj):
"""
if Enum is not None and issubclass(obj, Enum):
return (
_make_skeleton_enum, _enum_getnewargs(obj), _enum_getstate(obj),
None, None, _class_setstate
_make_skeleton_enum,
_enum_getnewargs(obj),
_enum_getstate(obj),
None,
None,
_class_setstate,
)
else:
return (
_make_skeleton_class, _class_getnewargs(obj), _class_getstate(obj),
None, None, _class_setstate
_make_skeleton_class,
_class_getnewargs(obj),
_class_getstate(obj),
None,
None,
_class_setstate,
)
@ -444,18 +494,18 @@ def _dict_keys_reduce(obj):
# Safer not to ship the full dict as sending the rest might
# be unintended and could potentially cause leaking of
# sensitive information
return _make_dict_keys, (list(obj), )
return _make_dict_keys, (list(obj),)
def _dict_values_reduce(obj):
# Safer not to ship the full dict as sending the rest might
# be unintended and could potentially cause leaking of
# sensitive information
return _make_dict_values, (list(obj), )
return _make_dict_values, (list(obj),)
def _dict_items_reduce(obj):
return _make_dict_items, (dict(obj), )
return _make_dict_items, (dict(obj),)
def _odict_keys_reduce(obj):
@ -563,7 +613,6 @@ class CloudPickler(Pickler):
_dispatch_table[type(OrderedDict().values())] = _odict_values_reduce
_dispatch_table[type(OrderedDict().items())] = _odict_items_reduce
dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table)
# TODO(suquark): Remove this patch when we use numpy >= 1.20.0 by default.
# We import 'numpy.core' here, so numpy would register the
@ -580,8 +629,7 @@ class CloudPickler(Pickler):
"""Reduce a function that is not pickleable via attribute lookup."""
newargs = self._function_getnewargs(func)
state = _function_getstate(func)
return (types.FunctionType, newargs, state, None, None,
_function_setstate)
return (types.FunctionType, newargs, state, None, None, _function_setstate)
def _function_reduce(self, obj):
"""Reducer for function objects.
@ -628,8 +676,7 @@ class CloudPickler(Pickler):
if func.__closure__ is None:
closure = None
else:
closure = tuple(
_make_empty_cell() for _ in range(len(code.co_freevars)))
closure = tuple(_make_empty_cell() for _ in range(len(code.co_freevars)))
return code, base_globals, None, None, closure
@ -639,8 +686,7 @@ class CloudPickler(Pickler):
except RuntimeError as e:
if "recursion" in e.args[0]:
msg = (
"Could not pickle object as excessively deep recursion "
"required."
"Could not pickle object as excessively deep recursion " "required."
)
raise pickle.PicklingError(msg) from e
else:
@ -711,11 +757,13 @@ class CloudPickler(Pickler):
reducers, such as Exceptions. See
https://github.com/cloudpipe/cloudpickle/issues/248
"""
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(
obj
): # noqa # pragma: no branch
try:
return (
_create_parametrized_type_hint,
parametrized_type_hint_getinitargs(obj)
parametrized_type_hint_getinitargs(obj),
)
except pickle.PicklingError:
# There are some false positive cases in '_is_parametrized_type_hint'.
@ -751,15 +799,27 @@ class CloudPickler(Pickler):
# sharing the same global namespace at pickling time also share
# their global namespace at unpickling time.
self.globals_ref = {}
assert hasattr(self, 'proto')
assert hasattr(self, "proto")
def _save_reduce_pickle5(self, func, args, state=None, listitems=None,
dictitems=None, state_setter=None, obj=None):
def _save_reduce_pickle5(
self,
func,
args,
state=None,
listitems=None,
dictitems=None,
state_setter=None,
obj=None,
):
save = self.save
write = self.write
self.save_reduce(
func, args, state=None, listitems=listitems,
dictitems=dictitems, obj=obj
func,
args,
state=None,
listitems=listitems,
dictitems=dictitems,
obj=obj,
)
# backport of the Python 3.8 state_setter pickle operations
save(state_setter)
@ -789,9 +849,12 @@ class CloudPickler(Pickler):
return self.save_reduce(type, (NotImplemented,), obj=obj)
elif obj in _BUILTIN_TYPE_NAMES:
return self.save_reduce(
_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj)
_builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj
)
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch
if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(
obj
): # noqa # pragma: no branch
# Parametrized typing constructs in Python < 3.7 are not
# compatible with type checks and ``isinstance`` semantics. For
# this reason, it is easier to detect them using a
@ -800,7 +863,7 @@ class CloudPickler(Pickler):
self.save_reduce(
_create_parametrized_type_hint,
parametrized_type_hint_getinitargs(obj),
obj=obj
obj=obj,
)
elif name is not None:
Pickler.save_global(self, obj, name=name)
@ -808,10 +871,11 @@ class CloudPickler(Pickler):
self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj)
else:
Pickler.save_global(self, obj, name=name)
dispatch[type] = save_global
def save_function(self, obj, name=None):
""" Registered with the dispatch to handle all function types.
"""Registered with the dispatch to handle all function types.
Determines what kind of function obj is (e.g. lambda, defined at
interactive prompt, etc) and handles the pickling appropriately.
@ -842,9 +906,11 @@ class CloudPickler(Pickler):
this routing should be removed when cloudpickle supports only PyPy
3.6 and later.
"""
rv = (types.FunctionType, (obj.__code__, {}, obj.__name__,
obj.__defaults__, obj.__closure__),
obj.__dict__)
rv = (
types.FunctionType,
(obj.__code__, {}, obj.__name__, obj.__defaults__, obj.__closure__),
obj.__dict__,
)
self.save_reduce(*rv, obj=obj)
dispatch[types.FunctionType] = save_function