diff --git a/python/ray/cloudpickle/__init__.py b/python/ray/cloudpickle/__init__.py index 91c46bb76..b28e91ee8 100644 --- a/python/ray/cloudpickle/__init__.py +++ b/python/ray/cloudpickle/__init__.py @@ -1,3 +1,11 @@ -from ray.cloudpickle.cloudpickle_fast import * # noqa: F401, F403 +from __future__ import absolute_import -__version__ = '1.4.1' + +from ray.cloudpickle.cloudpickle import * # noqa +from ray.cloudpickle.cloudpickle_fast import CloudPickler, dumps, dump # noqa + +# Conform to the convention used by python serialization libraries, which +# expose their Pickler subclass at top-level under the "Pickler" name. +Pickler = CloudPickler + +__version__ = '1.6.0' diff --git a/python/ray/cloudpickle/cloudpickle.py b/python/ray/cloudpickle/cloudpickle.py index c639daab1..05d52afa0 100644 --- a/python/ray/cloudpickle/cloudpickle.py +++ b/python/ray/cloudpickle/cloudpickle.py @@ -42,29 +42,21 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ from __future__ import print_function -import abc import builtins import dis -import io -import itertools -import logging import opcode -import operator -import pickle import platform -import struct import sys import types import weakref import uuid import threading import typing -from enum import Enum +import warnings +from .compat import pickle from typing import Generic, Union, Tuple, Callable -from pickle import _Pickler as Pickler from pickle import _getattribute -from io import BytesIO from importlib._bootstrap import _find_spec try: # pragma: no branch @@ -78,6 +70,17 @@ if sys.version_info >= (3, 5, 3): else: # pragma: no cover ClassVar = None +if sys.version_info >= (3, 8): + from types import CellType +else: + def f(): + a = 1 + + def g(): + return a + return g + CellType = type(f().__closure__[0]) + # cloudpickle is meant for inter process communication: we expect all # communicating processes to run the same Python version hence we favor @@ -163,9 +166,24 @@ def _whichmodule(obj, name): return None -def _is_importable_by_name(obj, name=None): - """Determine if obj can be pickled as attribute of a file-backed module""" - return _lookup_module_and_qualname(obj, name=name) is not None +def _is_importable(obj, name=None): + """Dispatcher utility to test the importability of various constructs.""" + if isinstance(obj, types.FunctionType): + return _lookup_module_and_qualname(obj, name=name) is not None + elif issubclass(type(obj), type): + return _lookup_module_and_qualname(obj, name=name) is not None + elif isinstance(obj, types.ModuleType): + # We assume that sys.modules is primarily used as a cache mechanism for + # the Python import machinery. Checking if a module has been added in + # is sys.modules therefore a cheap and simple heuristic to tell us whether + # we can assume that a given module could be imported by name in + # another Python process. + return obj.__name__ in sys.modules + else: + raise TypeError( + "cannot check importability of {} instances".format( + type(obj).__name__) + ) def _lookup_module_and_qualname(obj, name=None): @@ -187,6 +205,8 @@ def _lookup_module_and_qualname(obj, name=None): if module_name == "__main__": return None + # Note: if module_name is in sys.modules, the corresponding module is + # assumed importable at unpickling time. See #357 module = sys.modules.get(module_name, None) if module is None: # The main reason why obj's module would not be imported is that this @@ -196,10 +216,6 @@ def _lookup_module_and_qualname(obj, name=None): # supported, as the standard pickle does not support it either. return None - # module has been added to sys.modules, but it can still be dynamic. - if _is_dynamic(module): - return None - try: obj2, parent = _getattribute(module, name) except AttributeError: @@ -458,577 +474,61 @@ if sys.version_info[:2] < (3, 7): # pragma: no branch def _create_parametrized_type_hint(origin, args): return origin[args] +else: + _is_parametrized_type_hint = None + _create_parametrized_type_hint = None -class CloudPickler(Pickler): +def parametrized_type_hint_getinitargs(obj): + # The distorted type check sematic for typing construct becomes: + # ``type(obj) is type(TypeHint)``, which means "obj is a + # parametrized TypeHint" + if type(obj) is type(Literal): # pragma: no branch + initargs = (Literal, obj.__values__) + elif type(obj) is type(Final): # pragma: no branch + initargs = (Final, obj.__type__) + elif type(obj) is type(ClassVar): + initargs = (ClassVar, obj.__type__) + elif type(obj) is type(Generic): + parameters = obj.__parameters__ + if len(obj.__parameters__) > 0: + # in early Python 3.5, __parameters__ was sometimes + # preferred to __args__ + initargs = (obj.__origin__, parameters) - dispatch = Pickler.dispatch.copy() - - def __init__(self, file, protocol=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__(self, file, protocol=protocol) - # map ids to dictionary. used to ensure that functions can share global env - self.globals_ref = {} - - def dump(self, obj): - self.inject_addons() - try: - return Pickler.dump(self, obj) - except RuntimeError as e: - if 'recursion' in e.args[0]: - msg = """Could not pickle object as excessively deep recursion required.""" - raise pickle.PicklingError(msg) + else: + initargs = (obj.__origin__, obj.__args__) + elif type(obj) is type(Union): + if sys.version_info < (3, 5, 3): # pragma: no cover + initargs = (Union, obj.__union_params__) + else: + initargs = (Union, obj.__args__) + elif type(obj) is type(Tuple): + if sys.version_info < (3, 5, 3): # pragma: no cover + initargs = (Tuple, obj.__tuple_params__) + else: + initargs = (Tuple, obj.__args__) + elif type(obj) is type(Callable): + if sys.version_info < (3, 5, 3): # pragma: no cover + args = obj.__args__ + result = obj.__result__ + if args != Ellipsis: + if isinstance(args, tuple): + args = list(args) + else: + args = [args] + else: + (*args, result) = obj.__args__ + if len(args) == 1 and args[0] is Ellipsis: + args = Ellipsis else: - raise - - def save_typevar(self, obj): - self.save_reduce(*_typevar_reduce(obj), obj=obj) - - dispatch[typing.TypeVar] = save_typevar - - def save_memoryview(self, obj): - self.save(obj.tobytes()) - - dispatch[memoryview] = save_memoryview - - def save_module(self, obj): - """ - Save a module as an import - """ - if _is_dynamic(obj): - obj.__dict__.pop('__builtins__', None) - self.save_reduce(dynamic_subimport, (obj.__name__, vars(obj)), - obj=obj) - else: - self.save_reduce(subimport, (obj.__name__,), obj=obj) - - dispatch[types.ModuleType] = save_module - - def save_codeobject(self, obj): - """ - Save a code object - """ - if hasattr(obj, "co_posonlyargcount"): # pragma: no branch - args = ( - obj.co_argcount, obj.co_posonlyargcount, - obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, - obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, - obj.co_varnames, obj.co_filename, obj.co_name, - obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, - obj.co_cellvars - ) - else: - args = ( - obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, - obj.co_stacksize, obj.co_flags, obj.co_code, obj.co_consts, - obj.co_names, obj.co_varnames, obj.co_filename, - obj.co_name, obj.co_firstlineno, obj.co_lnotab, - obj.co_freevars, obj.co_cellvars - ) - self.save_reduce(types.CodeType, args, obj=obj) - - dispatch[types.CodeType] = save_codeobject - - def save_function(self, obj, name=None): - """ Registered with the dispatch to handle all function types. - - Determines what kind of function obj is (e.g. lambda, defined at - interactive prompt, etc) and handles the pickling appropriately. - """ - if _is_importable_by_name(obj, name=name): - return Pickler.save_global(self, obj, name=name) - elif PYPY and isinstance(obj.__code__, builtin_code_type): - return self.save_pypy_builtin_func(obj) - else: - return self.save_function_tuple(obj) - - dispatch[types.FunctionType] = save_function - - def save_pypy_builtin_func(self, obj): - """Save pypy equivalent of builtin functions. - - PyPy does not have the concept of builtin-functions. Instead, - builtin-functions are simple function instances, but with a - builtin-code attribute. - Most of the time, builtin functions should be pickled by attribute. But - PyPy has flaky support for __qualname__, so some builtin functions such - as float.__new__ will be classified as dynamic. For this reason only, - we created this special routine. Because builtin-functions are not - expected to have closure or globals, there is no additional hack - (compared the one already implemented in pickle) to protect ourselves - from reference cycles. A simple (reconstructor, newargs, obj.__dict__) - tuple is save_reduced. - - Note also that PyPy improved their support for __qualname__ in v3.6, so - this routing should be removed when cloudpickle supports only PyPy 3.6 - and later. - """ - rv = (types.FunctionType, (obj.__code__, {}, obj.__name__, - obj.__defaults__, obj.__closure__), - obj.__dict__) - self.save_reduce(*rv, obj=obj) - - def _save_dynamic_enum(self, obj, clsdict): - """Special handling for dynamic Enum subclasses - - Use a dedicated Enum constructor (inspired by EnumMeta.__call__) as the - EnumMeta metaclass has complex initialization that makes the Enum - subclasses hold references to their own instances. - """ - members = dict((e.name, e.value) for e in obj) - - self.save_reduce( - _make_skeleton_enum, - (obj.__bases__, obj.__name__, obj.__qualname__, - members, obj.__module__, _get_or_create_tracker_id(obj), None), - obj=obj - ) - - # Cleanup the clsdict that will be passed to _rehydrate_skeleton_class: - # Those attributes are already handled by the metaclass. - for attrname in ["_generate_next_value_", "_member_names_", - "_member_map_", "_member_type_", - "_value2member_map_"]: - clsdict.pop(attrname, None) - for member in members: - clsdict.pop(member) - - def save_dynamic_class(self, obj): - """Save a class that can't be stored as module global. - - This method is used to serialize classes that are defined inside - functions, or that otherwise can't be serialized as attribute lookups - from global modules. - """ - clsdict = _extract_class_dict(obj) - clsdict.pop('__weakref__', None) - - if issubclass(type(obj), abc.ABCMeta): - # If obj is an instance of an ABCMeta subclass, dont pickle the - # cache/negative caches populated during isinstance/issubclass - # checks, but pickle the list of registered subclasses of obj. - clsdict.pop('_abc_cache', None) - clsdict.pop('_abc_negative_cache', None) - clsdict.pop('_abc_negative_cache_version', None) - registry = clsdict.pop('_abc_registry', None) - if registry is None: - # in Python3.7+, the abc caches and registered subclasses of a - # class are bundled into the single _abc_impl attribute - clsdict.pop('_abc_impl', None) - (registry, _, _, _) = abc._get_dump(obj) - - clsdict["_abc_impl"] = [subclass_weakref() - for subclass_weakref in registry] - else: - # In the above if clause, registry is a set of weakrefs -- in - # this case, registry is a WeakSet - clsdict["_abc_impl"] = [type_ for type_ in registry] - - # On PyPy, __doc__ is a readonly attribute, so we need to include it in - # the initial skeleton class. This is safe because we know that the - # doc can't participate in a cycle with the original class. - type_kwargs = {'__doc__': clsdict.pop('__doc__', None)} - - if "__slots__" in clsdict: - type_kwargs['__slots__'] = obj.__slots__ - # pickle string length optimization: member descriptors of obj are - # created automatically from obj's __slots__ attribute, no need to - # save them in obj's state - if isinstance(obj.__slots__, str): - clsdict.pop(obj.__slots__) - else: - for k in obj.__slots__: - clsdict.pop(k, None) - - # If type overrides __dict__ as a property, include it in the type - # kwargs. In Python 2, we can't set this attribute after construction. - # XXX: can this ever happen in Python 3? If so add a test. - __dict__ = clsdict.pop('__dict__', None) - if isinstance(__dict__, property): - type_kwargs['__dict__'] = __dict__ - - save = self.save - write = self.write - - # We write pickle instructions explicitly here to handle the - # possibility that the type object participates in a cycle with its own - # __dict__. We first write an empty "skeleton" version of the class and - # memoize it before writing the class' __dict__ itself. We then write - # instructions to "rehydrate" the skeleton class by restoring the - # attributes from the __dict__. - # - # A type can appear in a cycle with its __dict__ if an instance of the - # type appears in the type's __dict__ (which happens for the stdlib - # Enum class), or if the type defines methods that close over the name - # of the type, (which is common for Python 2-style super() calls). - - # Push the rehydration function. - save(_rehydrate_skeleton_class) - - # Mark the start of the args tuple for the rehydration function. - write(pickle.MARK) - - # Create and memoize an skeleton class with obj's name and bases. - if Enum is not None and issubclass(obj, Enum): - # Special handling of Enum subclasses - self._save_dynamic_enum(obj, clsdict) - else: - # "Regular" class definition: - tp = type(obj) - self.save_reduce(_make_skeleton_class, - (tp, obj.__name__, _get_bases(obj), type_kwargs, - _get_or_create_tracker_id(obj), None), - obj=obj) - - # Now save the rest of obj's __dict__. Any references to obj - # encountered while saving will point to the skeleton class. - save(clsdict) - - # Write a tuple of (skeleton_class, clsdict). - write(pickle.TUPLE) - - # Call _rehydrate_skeleton_class(skeleton_class, clsdict) - write(pickle.REDUCE) - - def save_function_tuple(self, func): - """ Pickles an actual func object. - - A func comprises: code, globals, defaults, closure, and dict. We - extract and save these, injecting reducing functions at certain points - to recreate the func object. Keep in mind that some of these pieces - can contain a ref to the func itself. Thus, a naive save on these - pieces could trigger an infinite loop of save's. To get around that, - we first create a skeleton func object using just the code (this is - safe, since this won't contain a ref to the func), and memoize it as - soon as it's created. The other stuff can then be filled in later. - """ - if is_tornado_coroutine(func): - self.save_reduce(_rebuild_tornado_coroutine, (func.__wrapped__,), - obj=func) - return - - save = self.save - write = self.write - - code, f_globals, defaults, closure_values, dct, base_globals = self.extract_func_data(func) - - save(_fill_function) # skeleton function updater - write(pickle.MARK) # beginning of tuple that _fill_function expects - - # Extract currently-imported submodules used by func. Storing these - # modules in a smoke _cloudpickle_subimports attribute of the object's - # state will trigger the side effect of importing these modules at - # unpickling time (which is necessary for func to work correctly once - # depickled) - submodules = _find_imported_submodules( - code, - itertools.chain(f_globals.values(), closure_values or ()), + args = list(args) + initargs = (Callable, (args, result)) + else: # pragma: no cover + raise pickle.PicklingError( + "Cloudpickle Error: Unknown type {}".format(type(obj)) ) - - # create a skeleton function object and memoize it - save(_make_skel_func) - save(( - code, - len(closure_values) if closure_values is not None else -1, - base_globals, - )) - write(pickle.REDUCE) - self.memoize(func) - - # save the rest of the func data needed by _fill_function - state = { - 'globals': f_globals, - 'defaults': defaults, - 'dict': dct, - 'closure_values': closure_values, - 'module': func.__module__, - 'name': func.__name__, - 'doc': func.__doc__, - '_cloudpickle_submodules': submodules - } - if hasattr(func, '__annotations__'): - state['annotations'] = func.__annotations__ - if hasattr(func, '__qualname__'): - state['qualname'] = func.__qualname__ - if hasattr(func, '__kwdefaults__'): - state['kwdefaults'] = func.__kwdefaults__ - save(state) - write(pickle.TUPLE) - write(pickle.REDUCE) # applies _fill_function on the tuple - - def extract_func_data(self, func): - """ - Turn the function into a tuple of data necessary to recreate it: - code, globals, defaults, closure_values, dict - """ - code = func.__code__ - - # extract all global ref's - func_global_refs = _extract_code_globals(code) - - # process all variables referenced by global environment - f_globals = {} - for var in func_global_refs: - if var in func.__globals__: - f_globals[var] = func.__globals__[var] - - # defaults requires no processing - defaults = func.__defaults__ - - # process closure - closure = ( - list(map(_get_cell_contents, func.__closure__)) - if func.__closure__ is not None - else None - ) - - # save the dict - dct = func.__dict__ - - # base_globals represents the future global namespace of func at - # unpickling time. Looking it up and storing it in globals_ref allow - # functions sharing the same globals at pickling time to also - # share them once unpickled, at one condition: since globals_ref is - # an attribute of a Cloudpickler instance, and that a new CloudPickler is - # created each time pickle.dump or pickle.dumps is called, functions - # also need to be saved within the same invokation of - # cloudpickle.dump/cloudpickle.dumps (for example: cloudpickle.dumps([f1, f2])). There - # is no such limitation when using Cloudpickler.dump, as long as the - # multiple invokations are bound to the same Cloudpickler. - base_globals = self.globals_ref.setdefault(id(func.__globals__), {}) - - if base_globals == {}: - # Add module attributes used to resolve relative imports - # instructions inside func. - for k in ["__package__", "__name__", "__path__", "__file__"]: - # Some built-in functions/methods such as object.__new__ have - # their __globals__ set to None in PyPy - if func.__globals__ is not None and k in func.__globals__: - base_globals[k] = func.__globals__[k] - - return (code, f_globals, defaults, closure, dct, base_globals) - - def save_getset_descriptor(self, obj): - return self.save_reduce(getattr, (obj.__objclass__, obj.__name__)) - - dispatch[types.GetSetDescriptorType] = save_getset_descriptor - - def save_global(self, obj, name=None, pack=struct.pack): - """ - Save a "global". - - The name of this method is somewhat misleading: all types get - dispatched here. - """ - if obj is type(None): - return self.save_reduce(type, (None,), obj=obj) - elif obj is type(Ellipsis): - return self.save_reduce(type, (Ellipsis,), obj=obj) - elif obj is type(NotImplemented): - return self.save_reduce(type, (NotImplemented,), obj=obj) - elif obj in _BUILTIN_TYPE_NAMES: - return self.save_reduce( - _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) - - if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch - # Parametrized typing constructs in Python < 3.7 are not compatible - # with type checks and ``isinstance`` semantics. For this reason, - # it is easier to detect them using a duck-typing-based check - # (``_is_parametrized_type_hint``) than to populate the Pickler's - # dispatch with type-specific savers. - self._save_parametrized_type_hint(obj) - elif name is not None: - Pickler.save_global(self, obj, name=name) - elif not _is_importable_by_name(obj, name=name): - self.save_dynamic_class(obj) - else: - Pickler.save_global(self, obj, name=name) - - dispatch[type] = save_global - - def save_instancemethod(self, obj): - # Memoization rarely is ever useful due to python bounding - if obj.__self__ is None: - self.save_reduce(getattr, (obj.im_class, obj.__name__)) - else: - self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj) - - dispatch[types.MethodType] = save_instancemethod - - def save_property(self, obj): - # properties not correctly saved in python - self.save_reduce(property, (obj.fget, obj.fset, obj.fdel, obj.__doc__), - obj=obj) - - dispatch[property] = save_property - - def save_classmethod(self, obj): - orig_func = obj.__func__ - self.save_reduce(type(obj), (orig_func,), obj=obj) - - dispatch[classmethod] = save_classmethod - dispatch[staticmethod] = save_classmethod - - def save_itemgetter(self, obj): - """itemgetter serializer (needed for namedtuple support)""" - class Dummy: - def __getitem__(self, item): - return item - items = obj(Dummy()) - if not isinstance(items, tuple): - items = (items,) - return self.save_reduce(operator.itemgetter, items) - - if type(operator.itemgetter) is type: - dispatch[operator.itemgetter] = save_itemgetter - - def save_attrgetter(self, obj): - """attrgetter serializer""" - class Dummy(object): - def __init__(self, attrs, index=None): - self.attrs = attrs - self.index = index - def __getattribute__(self, item): - attrs = object.__getattribute__(self, "attrs") - index = object.__getattribute__(self, "index") - if index is None: - index = len(attrs) - attrs.append(item) - else: - attrs[index] = ".".join([attrs[index], item]) - return type(self)(attrs, index) - attrs = [] - obj(Dummy(attrs)) - return self.save_reduce(operator.attrgetter, tuple(attrs)) - - if type(operator.attrgetter) is type: - dispatch[operator.attrgetter] = save_attrgetter - - def save_file(self, obj): - """Save a file""" - - if not hasattr(obj, 'name') or not hasattr(obj, 'mode'): - raise pickle.PicklingError("Cannot pickle files that do not map to an actual file") - if obj is sys.stdout: - return self.save_reduce(getattr, (sys, 'stdout'), obj=obj) - if obj is sys.stderr: - return self.save_reduce(getattr, (sys, 'stderr'), obj=obj) - if obj is sys.stdin: - raise pickle.PicklingError("Cannot pickle standard input") - if obj.closed: - raise pickle.PicklingError("Cannot pickle closed files") - if hasattr(obj, 'isatty') and obj.isatty(): - raise pickle.PicklingError("Cannot pickle files that map to tty objects") - if 'r' not in obj.mode and '+' not in obj.mode: - raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode) - - name = obj.name - - # TODO: also support binary mode files with io.BytesIO - retval = io.StringIO() - - try: - # Read the whole file - curloc = obj.tell() - obj.seek(0) - contents = obj.read() - obj.seek(curloc) - except IOError: - raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name) - retval.write(contents) - retval.seek(curloc) - - retval.name = name - self.save(retval) - self.memoize(obj) - - def save_ellipsis(self, obj): - self.save_reduce(_gen_ellipsis, ()) - - def save_not_implemented(self, obj): - self.save_reduce(_gen_not_implemented, ()) - - dispatch[io.TextIOWrapper] = save_file - dispatch[type(Ellipsis)] = save_ellipsis - dispatch[type(NotImplemented)] = save_not_implemented - - def save_weakset(self, obj): - self.save_reduce(weakref.WeakSet, (list(obj),)) - - dispatch[weakref.WeakSet] = save_weakset - - def save_logger(self, obj): - self.save_reduce(logging.getLogger, (obj.name,), obj=obj) - - dispatch[logging.Logger] = save_logger - - def save_root_logger(self, obj): - self.save_reduce(logging.getLogger, (), obj=obj) - - dispatch[logging.RootLogger] = save_root_logger - - if hasattr(types, "MappingProxyType"): # pragma: no branch - def save_mappingproxy(self, obj): - self.save_reduce(types.MappingProxyType, (dict(obj),), obj=obj) - - dispatch[types.MappingProxyType] = save_mappingproxy - - """Special functions for Add-on libraries""" - def inject_addons(self): - """Plug in system. Register additional pickling functions if modules already loaded""" - pass - - if sys.version_info < (3, 7): # pragma: no branch - def _save_parametrized_type_hint(self, obj): - # The distorted type check sematic for typing construct becomes: - # ``type(obj) is type(TypeHint)``, which means "obj is a - # parametrized TypeHint" - if type(obj) is type(Literal): # pragma: no branch - initargs = (Literal, obj.__values__) - elif type(obj) is type(Final): # pragma: no branch - initargs = (Final, obj.__type__) - elif type(obj) is type(ClassVar): - initargs = (ClassVar, obj.__type__) - elif type(obj) is type(Generic): - parameters = obj.__parameters__ - if len(obj.__parameters__) > 0: - # in early Python 3.5, __parameters__ was sometimes - # preferred to __args__ - initargs = (obj.__origin__, parameters) - else: - initargs = (obj.__origin__, obj.__args__) - elif type(obj) is type(Union): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Union, obj.__union_params__) - else: - initargs = (Union, obj.__args__) - elif type(obj) is type(Tuple): - if sys.version_info < (3, 5, 3): # pragma: no cover - initargs = (Tuple, obj.__tuple_params__) - else: - initargs = (Tuple, obj.__args__) - elif type(obj) is type(Callable): - if sys.version_info < (3, 5, 3): # pragma: no cover - args = obj.__args__ - result = obj.__result__ - if args != Ellipsis: - if isinstance(args, tuple): - args = list(args) - else: - args = [args] - else: - (*args, result) = obj.__args__ - if len(args) == 1 and args[0] is Ellipsis: - args = Ellipsis - else: - args = list(args) - initargs = (Callable, (args, result)) - else: # pragma: no cover - raise pickle.PicklingError( - "Cloudpickle Error: Unknown type {}".format(type(obj)) - ) - self.save_reduce(_create_parametrized_type_hint, initargs, obj=obj) + return initargs # Tornado support @@ -1052,40 +552,6 @@ def _rebuild_tornado_coroutine(func): return gen.coroutine(func) -# Shorthands for legacy support - -def dump(obj, file, protocol=None): - """Serialize obj as bytes streamed into file - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed - between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - CloudPickler(file, protocol=protocol).dump(obj) - - -def dumps(obj, protocol=None): - """Serialize obj as a string of bytes allocated in memory - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed - between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - file = BytesIO() - try: - cp = CloudPickler(file, protocol=protocol) - cp.dump(obj) - return file.getvalue() - finally: - file.close() - - # including pickles unloading functions in this namespace load = pickle.load loads = pickle.loads @@ -1184,7 +650,7 @@ def _fill_function(*args): if 'annotations' in state: func.__annotations__ = state['annotations'] if 'doc' in state: - func.__doc__ = state['doc'] + func.__doc__ = state['doc'] if 'name' in state: func.__name__ = state['name'] if 'module' in state: @@ -1219,11 +685,24 @@ def _make_empty_cell(): return (lambda: cell).__closure__[0] +def _make_cell(value=_empty_cell_value): + cell = _make_empty_cell() + if value is not _empty_cell_value: + cell_set(cell, value) + return cell + + def _make_skel_func(code, cell_count, base_globals=None): """ Creates a skeleton function object that contains just the provided code and the correct number of cells in func_closure. All other func attributes (e.g. func_globals) are empty. """ + # This function is deprecated and should be removed in cloudpickle 1.7 + warnings.warn( + "A pickle file created using an old (<=1.4.1) version of cloudpicke " + "is currently being loaded. This is not supported by cloudpickle and " + "will break in cloudpickle 1.7", category=UserWarning + ) # This is backward-compatibility code: for cloudpickle versions between # 0.5.4 and 0.7, base_globals could be a string or None. base_globals # should now always be a dictionary. @@ -1307,39 +786,6 @@ def _make_skeleton_enum(bases, name, qualname, members, module, return _lookup_class_or_track(class_tracker_id, enum_class) -def _is_dynamic(module): - """ - Return True if the module is special module that cannot be imported by its - name. - """ - # Quick check: module that have __file__ attribute are not dynamic modules. - if hasattr(module, '__file__'): - return False - - if module.__spec__ is not None: - return False - - # In PyPy, Some built-in modules such as _codecs can have their - # __spec__ attribute set to None despite being imported. For such - # modules, the ``_find_spec`` utility of the standard library is used. - parent_name = module.__name__.rpartition('.')[0] - if parent_name: # pragma: no cover - # This code handles the case where an imported package (and not - # module) remains with __spec__ set to None. It is however untested - # as no package in the PyPy stdlib has __spec__ set to None after - # it is imported. - try: - parent = sys.modules[parent_name] - except KeyError: - msg = "parent {!r} not in sys.modules" - raise ImportError(msg.format(parent_name)) - else: - pkgpath = parent.__path__ - else: - pkgpath = None - return _find_spec(module.__name__, pkgpath, module) is None - - def _make_typevar(name, bound, constraints, covariant, contravariant, class_tracker_id): tv = typing.TypeVar( @@ -1382,3 +828,15 @@ def _get_bases(typ): # For regular class objects bases_attr = '__bases__' return getattr(typ, bases_attr) + + +def _make_dict_keys(obj): + return dict.fromkeys(obj).keys() + + +def _make_dict_values(obj): + return {i: _ for i, _ in enumerate(obj)}.values() + + +def _make_dict_items(obj): + return obj.items() diff --git a/python/ray/cloudpickle/cloudpickle_fast.py b/python/ray/cloudpickle/cloudpickle_fast.py index fd16493b8..fa8da0f63 100644 --- a/python/ray/cloudpickle/cloudpickle_fast.py +++ b/python/ray/cloudpickle/cloudpickle_fast.py @@ -10,65 +10,100 @@ Note that the C Pickler sublassing API is CPython-specific. Therefore, some guards present in cloudpickle.py that were written to handle PyPy specificities are not present in cloudpickle_fast.py """ +import _collections_abc import abc import copyreg import io import itertools import logging import sys +import struct import types import weakref import typing +from enum import Enum +from collections import ChainMap + +from .compat import pickle, Pickler from .cloudpickle import ( - _is_dynamic, _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, - _find_imported_submodules, _get_cell_contents, _is_importable_by_name, _builtin_type, - Enum, _get_or_create_tracker_id, _make_skeleton_class, _make_skeleton_enum, - _extract_class_dict, dynamic_subimport, subimport, _typevar_reduce, _get_bases, - cell_set, _make_empty_cell, + _extract_code_globals, _BUILTIN_TYPE_NAMES, DEFAULT_PROTOCOL, + _find_imported_submodules, _get_cell_contents, _is_importable, + _builtin_type, _get_or_create_tracker_id, _make_skeleton_class, + _make_skeleton_enum, _extract_class_dict, dynamic_subimport, subimport, + _typevar_reduce, _get_bases, _make_cell, _make_empty_cell, CellType, + _is_parametrized_type_hint, PYPY, cell_set, + parametrized_type_hint_getinitargs, _create_parametrized_type_hint, + builtin_code_type, + _make_dict_keys, _make_dict_values, _make_dict_items, ) -if sys.version_info[:2] < (3, 8): - import pickle5 as pickle - from pickle5 import Pickler - load, loads = pickle.load, pickle.loads + +if pickle.HIGHEST_PROTOCOL >= 5 and not PYPY: + # Shorthands similar to pickle.dump/pickle.dumps + + def dump(obj, file, protocol=None, buffer_callback=None): + """Serialize obj as bytes streamed into file + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication + speed between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + CloudPickler( + file, protocol=protocol, buffer_callback=buffer_callback + ).dump(obj) + + def dumps(obj, protocol=None, buffer_callback=None): + """Serialize obj as a string of bytes allocated in memory + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication + speed between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + with io.BytesIO() as file: + cp = CloudPickler( + file, protocol=protocol, buffer_callback=buffer_callback + ) + cp.dump(obj) + return file.getvalue() + else: - import _pickle - import pickle - from _pickle import Pickler - load, loads = _pickle.load, _pickle.loads + # Shorthands similar to pickle.dump/pickle.dumps + def dump(obj, file, protocol=None): + """Serialize obj as bytes streamed into file -import numpy + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication + speed between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + CloudPickler(file, protocol=protocol).dump(obj) + + def dumps(obj, protocol=None): + """Serialize obj as a string of bytes allocated in memory + + protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to + pickle.HIGHEST_PROTOCOL. This setting favors maximum communication + speed between processes running the same Python version. + + Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure + compatibility with older versions of Python. + """ + with io.BytesIO() as file: + cp = CloudPickler(file, protocol=protocol) + cp.dump(obj) + return file.getvalue() -# Shorthands similar to pickle.dump/pickle.dumps -def dump(obj, file, protocol=None, buffer_callback=None): - """Serialize obj as bytes streamed into file - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed - between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback).dump(obj) - - -def dumps(obj, protocol=None, buffer_callback=None): - """Serialize obj as a string of bytes allocated in memory - - protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to - pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed - between processes running the same Python version. - - Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure - compatibility with older versions of Python. - """ - with io.BytesIO() as file: - cp = CloudPickler(file, protocol=protocol, buffer_callback=buffer_callback) - cp.dump(obj) - return file.getvalue() +load, loads = pickle.load, pickle.loads # COLLECTION OF OBJECTS __getnewargs__-LIKE METHODS @@ -151,21 +186,15 @@ def _class_getstate(obj): clsdict.pop('_abc_cache', None) clsdict.pop('_abc_negative_cache', None) clsdict.pop('_abc_negative_cache_version', None) - clsdict.pop('_abc_impl', None) registry = clsdict.pop('_abc_registry', None) if registry is None: # in Python3.7+, the abc caches and registered subclasses of a # class are bundled into the single _abc_impl attribute - if hasattr(abc, '_get_dump'): - (registry, _, _, _) = abc._get_dump(obj) - clsdict["_abc_impl"] = [subclass_weakref() - for subclass_weakref in registry] - else: - # FIXME(suquark): The upstream cloudpickle cannot work in Ray - # because sometimes both '_abc_registry' and '_get_dump' does - # not exist. Some strange typing objects may cause this issue. - # Here the workaround just set "_abc_impl" to None. - clsdict["_abc_impl"] = None + clsdict.pop('_abc_impl', None) + (registry, _, _, _) = abc._get_dump(obj) + + clsdict["_abc_impl"] = [subclass_weakref() + for subclass_weakref in registry] else: # In the above if clause, registry is a set of weakrefs -- in # this case, registry is a WeakSet @@ -217,13 +246,13 @@ def _code_reduce(obj): """codeobject reducer""" if hasattr(obj, "co_posonlyargcount"): # pragma: no branch args = ( - obj.co_argcount, obj.co_posonlyargcount, - obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, - obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, - obj.co_varnames, obj.co_filename, obj.co_name, - obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, - obj.co_cellvars - ) + obj.co_argcount, obj.co_posonlyargcount, + obj.co_kwonlyargcount, obj.co_nlocals, obj.co_stacksize, + obj.co_flags, obj.co_code, obj.co_consts, obj.co_names, + obj.co_varnames, obj.co_filename, obj.co_name, + obj.co_firstlineno, obj.co_lnotab, obj.co_freevars, + obj.co_cellvars + ) else: args = ( obj.co_argcount, obj.co_kwonlyargcount, obj.co_nlocals, @@ -235,26 +264,14 @@ def _code_reduce(obj): return types.CodeType, args -def _make_cell(contents): - cell = _make_empty_cell() - cell_set(cell, contents) - return cell - - def _cell_reduce(obj): """Cell (containing values of a function's free variables) reducer""" try: - contents = (obj.cell_contents,) + obj.cell_contents except ValueError: # cell is empty - contents = () - - if sys.version_info[:2] < (3, 8): - if contents: - return _make_cell, contents - else: - return _make_empty_cell, () + return _make_empty_cell, () else: - return types.CellType, contents + return _make_cell, (obj.cell_contents, ) def _classmethod_reduce(obj): @@ -298,10 +315,10 @@ def _file_reduce(obj): obj.seek(0) contents = obj.read() obj.seek(curloc) - except IOError: + except IOError as e: raise pickle.PicklingError( "Cannot pickle file %s as it cannot be read" % name - ) + ) from e retval.write(contents) retval.seek(curloc) @@ -322,11 +339,11 @@ def _memoryview_reduce(obj): def _module_reduce(obj): - if _is_dynamic(obj): + if _is_importable(obj): + return subimport, (obj.__name__,) + else: obj.__dict__.pop('__builtins__', None) return dynamic_subimport, (obj.__name__, vars(obj)) - else: - return subimport, (obj.__name__,) def _method_reduce(obj): @@ -379,11 +396,29 @@ def _class_reduce(obj): return type, (NotImplemented,) elif obj in _BUILTIN_TYPE_NAMES: return _builtin_type, (_BUILTIN_TYPE_NAMES[obj],) - elif not _is_importable_by_name(obj): + elif not _is_importable(obj): return _dynamic_class_reduce(obj) return NotImplemented +def _dict_keys_reduce(obj): + # Safer not to ship the full dict as sending the rest might + # be unintended and could potentially cause leaking of + # sensitive information + return _make_dict_keys, (list(obj), ) + + +def _dict_values_reduce(obj): + # Safer not to ship the full dict as sending the rest might + # be unintended and could potentially cause leaking of + # sensitive information + return _make_dict_values, (list(obj), ) + + +def _dict_items_reduce(obj): + return _make_dict_items, (dict(obj), ) + + # COLLECTIONS OF OBJECTS STATE SETTERS # ------------------------------------ # state setters are called at unpickling time, once the object is created and @@ -439,154 +474,30 @@ def _class_setstate(obj, state): return obj -def _numpy_frombuffer(buffer, dtype, shape, order): - # Get the _frombuffer() function for reconstruction - from numpy.core.numeric import _frombuffer - array = _frombuffer(buffer, dtype, shape, order) - # Unfortunately, numpy does not follow the standard, so we still - # have to set the readonly flag for it here. - array.setflags(write=isinstance(buffer, bytearray) or not buffer.readonly) - return array - - -def _numpy_ndarray_reduce(array): - # This function is implemented according to 'array_reduce_ex_picklebuffer' - # in numpy C backend. This is a workaround for python3.5 pickling support. - if sys.version_info >= (3, 8): - import pickle - picklebuf_class = pickle.PickleBuffer - elif sys.version_info >= (3, 5): - try: - import pickle5 - picklebuf_class = pickle5.PickleBuffer - except Exception: - raise ImportError("Using pickle protocol 5 requires the pickle5 " - "module for Python >=3.5 and <3.8") - else: - raise ValueError("pickle protocol 5 is not available for Python < 3.5") - # if the array if Fortran-contiguous and not C-contiguous, - # the PickleBuffer instance will hold a view on the transpose - # of the initial array, that is C-contiguous. - if not array.flags.c_contiguous and array.flags.f_contiguous: - order = "F" - picklebuf_args = array.transpose() - else: - order = "C" - picklebuf_args = array - try: - buffer = picklebuf_class(picklebuf_args) - except Exception: - # Some arrays may refuse to export a buffer, in which case - # just fall back on regular __reduce_ex__ implementation - # (gh-12745). - return array.__reduce__() - - return _numpy_frombuffer, (buffer, array.dtype, array.shape, order) - - class CloudPickler(Pickler): - """Fast C Pickler extension with additional reducing routines. - - CloudPickler's extensions exist into into: - - * its dispatch_table containing reducers that are called only if ALL - built-in saving functions were previously discarded. - * a special callback named "reducer_override", invoked before standard - function/class builtin-saving method (save_global), to serialize dynamic - functions - """ - - # cloudpickle's own dispatch_table, containing the additional set of - # objects (compared to the standard library pickle) that cloupickle can - # serialize. - dispatch = {} - dispatch[classmethod] = _classmethod_reduce - dispatch[io.TextIOWrapper] = _file_reduce - dispatch[logging.Logger] = _logger_reduce - dispatch[logging.RootLogger] = _root_logger_reduce - dispatch[memoryview] = _memoryview_reduce - dispatch[property] = _property_reduce - dispatch[staticmethod] = _classmethod_reduce - if sys.version_info[:2] >= (3, 8): - dispatch[types.CellType] = _cell_reduce - else: - dispatch[type(_make_empty_cell())] = _cell_reduce - dispatch[types.CodeType] = _code_reduce - dispatch[types.GetSetDescriptorType] = _getset_descriptor_reduce - dispatch[types.ModuleType] = _module_reduce - dispatch[types.MethodType] = _method_reduce - dispatch[types.MappingProxyType] = _mappingproxy_reduce - dispatch[weakref.WeakSet] = _weakset_reduce - dispatch[typing.TypeVar] = _typevar_reduce - - def __init__(self, file, protocol=None, buffer_callback=None): - if protocol is None: - protocol = DEFAULT_PROTOCOL - Pickler.__init__(self, file, protocol=protocol, buffer_callback=buffer_callback) - # map functions __globals__ attribute ids, to ensure that functions - # sharing the same global namespace at pickling time also share their - # global namespace at unpickling time. - self.globals_ref = {} - - # Take into account potential custom reducers registered by external - # modules - self.dispatch_table = copyreg.dispatch_table.copy() - self.dispatch_table.update(self.dispatch) - self.proto = int(protocol) - - def reducer_override(self, obj): - """Type-agnostic reducing callback for function and classes. - - For performance reasons, subclasses of the C _pickle.Pickler class - cannot register custom reducers for functions and classes in the - dispatch_table. Reducer for such types must instead implemented in the - special reducer_override method. - - Note that method will be called for any object except a few - builtin-types (int, lists, dicts etc.), which differs from reducers in - the Pickler's dispatch_table, each of them being invoked for objects of - a specific type only. - - This property comes in handy for classes: although most classes are - instances of the ``type`` metaclass, some of them can be instances of - other custom metaclasses (such as enum.EnumMeta for example). In - particular, the metaclass will likely not be known in advance, and thus - cannot be special-cased using an entry in the dispatch_table. - reducer_override, among other things, allows us to register a reducer - that will be called for any class, independently of its type. + # set of reducers defined and used by cloudpickle (private) + _dispatch_table = {} + _dispatch_table[classmethod] = _classmethod_reduce + _dispatch_table[io.TextIOWrapper] = _file_reduce + _dispatch_table[logging.Logger] = _logger_reduce + _dispatch_table[logging.RootLogger] = _root_logger_reduce + _dispatch_table[memoryview] = _memoryview_reduce + _dispatch_table[property] = _property_reduce + _dispatch_table[staticmethod] = _classmethod_reduce + _dispatch_table[CellType] = _cell_reduce + _dispatch_table[types.CodeType] = _code_reduce + _dispatch_table[types.GetSetDescriptorType] = _getset_descriptor_reduce + _dispatch_table[types.ModuleType] = _module_reduce + _dispatch_table[types.MethodType] = _method_reduce + _dispatch_table[types.MappingProxyType] = _mappingproxy_reduce + _dispatch_table[weakref.WeakSet] = _weakset_reduce + _dispatch_table[typing.TypeVar] = _typevar_reduce + _dispatch_table[_collections_abc.dict_keys] = _dict_keys_reduce + _dispatch_table[_collections_abc.dict_values] = _dict_values_reduce + _dispatch_table[_collections_abc.dict_items] = _dict_items_reduce - Notes: - - * reducer_override has the priority over dispatch_table-registered - reducers. - * reducer_override can be used to fix other limitations of cloudpickle - for other types that suffered from type-specific reducers, such as - Exceptions. See https://github.com/cloudpipe/cloudpickle/issues/248 - """ - - # This is a patch for python3.5 - if isinstance(obj, numpy.ndarray): - if (self.proto < 5 or - (not obj.flags.c_contiguous and not obj.flags.f_contiguous) or - (issubclass(type(obj), numpy.ndarray) and type(obj) is not numpy.ndarray) or - obj.dtype == "O" or obj.itemsize == 0): - return NotImplemented - return _numpy_ndarray_reduce(obj) - - t = type(obj) - try: - is_anyclass = issubclass(t, type) - except TypeError: # t is not a class (old Boost; see SF #502085) - is_anyclass = False - - if is_anyclass: - return _class_reduce(obj) - elif isinstance(obj, types.FunctionType): - return self._function_reduce(obj) - else: - # fallback to save_global, including the Pickler's distpatch_table - return NotImplemented + dispatch_table = ChainMap(_dispatch_table, copyreg.dispatch_table) # function reducers are defined as instance methods of CloudPickler # objects, as they rely on a CloudPickler attribute (globals_ref) @@ -609,7 +520,7 @@ class CloudPickler(Pickler): As opposed to cloudpickle.py, There no special handling for builtin pypy functions because cloudpickle_fast is CPython-specific. """ - if _is_importable_by_name(obj): + if _is_importable(obj): return NotImplemented else: return self._dynamic_function_reduce(obj) @@ -642,12 +553,8 @@ class CloudPickler(Pickler): if func.__closure__ is None: closure = None else: - if sys.version_info[:2] >= (3, 8): - closure = tuple( - types.CellType() for _ in range(len(code.co_freevars))) - else: - closure = tuple( - _make_empty_cell() for _ in range(len(code.co_freevars))) + closure = tuple( + _make_empty_cell() for _ in range(len(code.co_freevars))) return code, base_globals, None, None, closure @@ -660,6 +567,204 @@ class CloudPickler(Pickler): "Could not pickle object as excessively deep recursion " "required." ) - raise pickle.PicklingError(msg) + raise pickle.PicklingError(msg) from e else: raise + + if pickle.HIGHEST_PROTOCOL >= 5: + # `CloudPickler.dispatch` is only left for backward compatibility - note + # that when using protocol 5, `CloudPickler.dispatch` is not an + # extension of `Pickler.dispatch` dictionary, because CloudPickler + # subclasses the C-implemented Pickler, which does not expose a + # `dispatch` attribute. Earlier versions of the protocol 5 CloudPickler + # used `CloudPickler.dispatch` as a class-level attribute storing all + # reducers implemented by cloudpickle, but the attribute name was not a + # great choice given the meaning of `Cloudpickler.dispatch` when + # `CloudPickler` extends the pure-python pickler. + dispatch = dispatch_table + + # Implementation of the reducer_override callback, in order to + # efficiently serialize dynamic functions and classes by subclassing + # the C-implemented Pickler. + # TODO: decorrelate reducer_override (which is tied to CPython's + # implementation - would it make sense to backport it to pypy? - and + # pickle's protocol 5 which is implementation agnostic. Currently, the + # availability of both notions coincide on CPython's pickle and the + # pickle5 backport, but it may not be the case anymore when pypy + # implements protocol 5 + def __init__(self, file, protocol=None, buffer_callback=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__( + self, file, protocol=protocol, buffer_callback=buffer_callback + ) + # map functions __globals__ attribute ids, to ensure that functions + # sharing the same global namespace at pickling time also share + # their global namespace at unpickling time. + self.globals_ref = {} + self.proto = int(protocol) + + def reducer_override(self, obj): + """Type-agnostic reducing callback for function and classes. + + For performance reasons, subclasses of the C _pickle.Pickler class + cannot register custom reducers for functions and classes in the + dispatch_table. Reducer for such types must instead implemented in + the special reducer_override method. + + Note that method will be called for any object except a few + builtin-types (int, lists, dicts etc.), which differs from reducers + in the Pickler's dispatch_table, each of them being invoked for + objects of a specific type only. + + This property comes in handy for classes: although most classes are + instances of the ``type`` metaclass, some of them can be instances + of other custom metaclasses (such as enum.EnumMeta for example). In + particular, the metaclass will likely not be known in advance, and + thus cannot be special-cased using an entry in the dispatch_table. + reducer_override, among other things, allows us to register a + reducer that will be called for any class, independently of its + type. + + + Notes: + + * reducer_override has the priority over dispatch_table-registered + reducers. + * reducer_override can be used to fix other limitations of + cloudpickle for other types that suffered from type-specific + reducers, such as Exceptions. See + https://github.com/cloudpipe/cloudpickle/issues/248 + """ + if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch + return ( + _create_parametrized_type_hint, + parametrized_type_hint_getinitargs(obj) + ) + t = type(obj) + try: + is_anyclass = issubclass(t, type) + except TypeError: # t is not a class (old Boost; see SF #502085) + is_anyclass = False + + if is_anyclass: + return _class_reduce(obj) + elif isinstance(obj, types.FunctionType): + return self._function_reduce(obj) + else: + # fallback to save_global, including the Pickler's + # distpatch_table + return NotImplemented + + else: + # When reducer_override is not available, hack the pure-Python + # Pickler's types.FunctionType and type savers. Note: the type saver + # must override Pickler.save_global, because pickle.py contains a + # hard-coded call to save_global when pickling meta-classes. + dispatch = Pickler.dispatch.copy() + + def __init__(self, file, protocol=None): + if protocol is None: + protocol = DEFAULT_PROTOCOL + Pickler.__init__(self, file, protocol=protocol) + # map functions __globals__ attribute ids, to ensure that functions + # sharing the same global namespace at pickling time also share + # their global namespace at unpickling time. + self.globals_ref = {} + assert hasattr(self, 'proto') + + def _save_reduce_pickle5(self, func, args, state=None, listitems=None, + dictitems=None, state_setter=None, obj=None): + save = self.save + write = self.write + self.save_reduce( + func, args, state=None, listitems=listitems, + dictitems=dictitems, obj=obj + ) + # backport of the Python 3.8 state_setter pickle operations + save(state_setter) + save(obj) # simple BINGET opcode as obj is already memoized. + save(state) + write(pickle.TUPLE2) + # Trigger a state_setter(obj, state) function call. + write(pickle.REDUCE) + # The purpose of state_setter is to carry-out an + # inplace modification of obj. We do not care about what the + # method might return, so its output is eventually removed from + # the stack. + write(pickle.POP) + + def save_global(self, obj, name=None, pack=struct.pack): + """ + Save a "global". + + The name of this method is somewhat misleading: all types get + dispatched here. + """ + if obj is type(None): # noqa + return self.save_reduce(type, (None,), obj=obj) + elif obj is type(Ellipsis): + return self.save_reduce(type, (Ellipsis,), obj=obj) + elif obj is type(NotImplemented): + return self.save_reduce(type, (NotImplemented,), obj=obj) + elif obj in _BUILTIN_TYPE_NAMES: + return self.save_reduce( + _builtin_type, (_BUILTIN_TYPE_NAMES[obj],), obj=obj) + + if sys.version_info[:2] < (3, 7) and _is_parametrized_type_hint(obj): # noqa # pragma: no branch + # Parametrized typing constructs in Python < 3.7 are not + # compatible with type checks and ``isinstance`` semantics. For + # this reason, it is easier to detect them using a + # duck-typing-based check (``_is_parametrized_type_hint``) than + # to populate the Pickler's dispatch with type-specific savers. + self.save_reduce( + _create_parametrized_type_hint, + parametrized_type_hint_getinitargs(obj), + obj=obj + ) + elif name is not None: + Pickler.save_global(self, obj, name=name) + elif not _is_importable(obj, name=name): + self._save_reduce_pickle5(*_dynamic_class_reduce(obj), obj=obj) + else: + Pickler.save_global(self, obj, name=name) + dispatch[type] = save_global + + def save_function(self, obj, name=None): + """ Registered with the dispatch to handle all function types. + + Determines what kind of function obj is (e.g. lambda, defined at + interactive prompt, etc) and handles the pickling appropriately. + """ + if _is_importable(obj, name=name): + return Pickler.save_global(self, obj, name=name) + elif PYPY and isinstance(obj.__code__, builtin_code_type): + return self.save_pypy_builtin_func(obj) + else: + return self._save_reduce_pickle5( + *self._dynamic_function_reduce(obj), obj=obj + ) + + def save_pypy_builtin_func(self, obj): + """Save pypy equivalent of builtin functions. + PyPy does not have the concept of builtin-functions. Instead, + builtin-functions are simple function instances, but with a + builtin-code attribute. + Most of the time, builtin functions should be pickled by attribute. + But PyPy has flaky support for __qualname__, so some builtin + functions such as float.__new__ will be classified as dynamic. For + this reason only, we created this special routine. Because + builtin-functions are not expected to have closure or globals, + there is no additional hack (compared the one already implemented + in pickle) to protect ourselves from reference cycles. A simple + (reconstructor, newargs, obj.__dict__) tuple is save_reduced. Note + also that PyPy improved their support for __qualname__ in v3.6, so + this routing should be removed when cloudpickle supports only PyPy + 3.6 and later. + """ + rv = (types.FunctionType, (obj.__code__, {}, obj.__name__, + obj.__defaults__, obj.__closure__), + obj.__dict__) + self.save_reduce(*rv, obj=obj) + + dispatch[types.FunctionType] = save_function diff --git a/python/ray/cloudpickle/compat.py b/python/ray/cloudpickle/compat.py new file mode 100644 index 000000000..afa285f62 --- /dev/null +++ b/python/ray/cloudpickle/compat.py @@ -0,0 +1,13 @@ +import sys + + +if sys.version_info < (3, 8): + try: + import pickle5 as pickle # noqa: F401 + from pickle5 import Pickler # noqa: F401 + except ImportError: + import pickle # noqa: F401 + from pickle import _Pickler as Pickler # noqa: F401 +else: + import pickle # noqa: F401 + from _pickle import Pickler # noqa: F401 diff --git a/python/ray/includes/serialization.pxi b/python/ray/includes/serialization.pxi index 31a5f1e04..f20165fc1 100644 --- a/python/ray/includes/serialization.pxi +++ b/python/ray/includes/serialization.pxi @@ -1,6 +1,5 @@ from libc.string cimport memcpy from libc.stdint cimport uintptr_t, uint64_t, INT32_MAX -from libcpp cimport nullptr import cython DEF MEMCOPY_THREADS = 6 @@ -116,6 +115,9 @@ cdef class SubBuffer: self.buf, self.len) def __getbuffer__(self, Py_buffer* buffer, int flags): + if flags & cpython.PyBUF_WRITABLE: + # Ray ensures all buffers are immutable. + raise BufferError buffer.readonly = self.readonly buffer.buf = self.buf buffer.format = self._format.c_str() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 7b7e305cb..42beaa1bc 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -55,6 +55,7 @@ py_test_module_list( "test_reconstruction.py", "test_reference_counting_2.py", "test_reference_counting.py", + "test_serialization.py", "test_stress.py", "test_stress_sharded.py", "test_unreconstructable_errors.py", @@ -89,7 +90,6 @@ py_test_module_list( "test_node_manager.py", "test_numba.py", "test_ray_init.py", - "test_serialization.py", "test_tempfile.py", "test_webui.py", ], diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 63837c8a2..203e484fc 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1,11 +1,9 @@ # coding: utf-8 -import io import logging import os import pickle import sys import time -import weakref import numpy as np import pytest @@ -416,51 +414,6 @@ def test_ray_recursive_objects(ray_start_shared_local_modes): ray.put(obj) -def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): - # bpo-39492: reducer_override used to induce a spurious reference cycle - # inside the Pickler object, that could prevent all serialized objects - # from being garbage-collected without explicity invoking gc.collect. - - # test a dynamic function - def f(): - return 4669201609102990671853203821578 - - wr = weakref.ref(f) - - bio = io.BytesIO() - from ray.cloudpickle import CloudPickler, loads, dumps - p = CloudPickler(bio, protocol=5) - p.dump(f) - new_f = loads(bio.getvalue()) - assert new_f() == 4669201609102990671853203821578 - - del p - del f - - assert wr() is None - - # test a dynamic class - class ShortlivedObject: - def __del__(self): - print("Went out of scope!") - - obj = ShortlivedObject() - new_obj = weakref.ref(obj) - - dumps(obj) - del obj - assert new_obj() is None - - -def test_deserialized_from_buffer_immutable(ray_start_shared_local_modes): - x = np.full((2, 2), 1.) - o = ray.put(x) - y = ray.get(o) - with pytest.raises( - ValueError, match="assignment destination is read-only"): - y[0, 0] = 9. - - def test_passing_arguments_by_value_out_of_the_box( ray_start_shared_local_modes): @ray.remote diff --git a/python/ray/tests/test_serialization.py b/python/ray/tests/test_serialization.py index 922c72937..8004e694d 100644 --- a/python/ray/tests/test_serialization.py +++ b/python/ray/tests/test_serialization.py @@ -5,6 +5,7 @@ import logging import re import string import sys +import weakref import numpy as np import pytest @@ -497,6 +498,51 @@ def test_register_class(ray_start_2_cpus): assert not hasattr(c2, "method1") +def test_deserialized_from_buffer_immutable(ray_start_shared_local_modes): + x = np.full((2, 2), 1.) + o = ray.put(x) + y = ray.get(o) + with pytest.raises( + ValueError, match="assignment destination is read-only"): + y[0, 0] = 9. + + +def test_reducer_override_no_reference_cycle(ray_start_shared_local_modes): + # bpo-39492: reducer_override used to induce a spurious reference cycle + # inside the Pickler object, that could prevent all serialized objects + # from being garbage-collected without explicity invoking gc.collect. + + # test a dynamic function + def f(): + return 4669201609102990671853203821578 + + wr = weakref.ref(f) + + bio = io.BytesIO() + from ray.cloudpickle import CloudPickler, loads, dumps + p = CloudPickler(bio, protocol=5) + p.dump(f) + new_f = loads(bio.getvalue()) + assert new_f() == 4669201609102990671853203821578 + + del p + del f + + assert wr() is None + + # test a dynamic class + class ShortlivedObject: + def __del__(self): + print("Went out of scope!") + + obj = ShortlivedObject() + new_obj = weakref.ref(obj) + + dumps(obj) + del obj + assert new_obj() is None + + if __name__ == "__main__": import pytest sys.exit(pytest.main(["-v", __file__]))