mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
This reverts commit 3e53619d64
.
This commit is contained in:
parent
195cdcf5b8
commit
4069686e0f
8 changed files with 28 additions and 66 deletions
|
@ -36,7 +36,7 @@ Install Ray with: ``pip install ray``. For nightly wheels, see the
|
|||
.. _`Dask`: https://docs.ray.io/en/master/dask-on-ray.html
|
||||
.. _`Horovod`: https://horovod.readthedocs.io/en/stable/ray_include.html
|
||||
.. _`Scikit-learn`: joblib.html
|
||||
.. _`Datasets`: https://docs.ray.io/en/master/data/dataset.html
|
||||
|
||||
|
||||
|
||||
Quick Start
|
||||
|
|
|
@ -314,11 +314,6 @@ Datasets can read and write in parallel to `custom datasources <package-ref.html
|
|||
# Write to a custom datasource.
|
||||
ds.write_datasource(YourCustomDatasource(), **write_args)
|
||||
|
||||
Tensor-typed values
|
||||
-------------------
|
||||
|
||||
Currently Datasets does not have native support for tensor-typed values in records (e.g., TFRecord / Petastorm format / multi-dimensional arrays). This is planned for development.
|
||||
|
||||
Pipelining data processing and ML computations
|
||||
----------------------------------------------
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ from ray import ray_constants
|
|||
from ray import cloudpickle as pickle
|
||||
from ray._raylet import PythonFunctionDescriptor
|
||||
from ray._private.utils import (
|
||||
check_oversized_function,
|
||||
check_oversized_pickle,
|
||||
decode,
|
||||
ensure_str,
|
||||
format_error_message,
|
||||
|
@ -136,9 +136,9 @@ class FunctionActorManager:
|
|||
function = remote_function._function
|
||||
pickled_function = pickle.dumps(function)
|
||||
|
||||
check_oversized_function(pickled_function,
|
||||
remote_function._function_name,
|
||||
"remote function", self._worker)
|
||||
check_oversized_pickle(pickled_function,
|
||||
remote_function._function_name,
|
||||
"remote function", self._worker)
|
||||
key = (b"RemoteFunction:" + self._worker.current_job_id.binary() + b":"
|
||||
+ remote_function._function_descriptor.function_id.binary())
|
||||
self._worker.redis_client.hset(
|
||||
|
@ -367,9 +367,9 @@ class FunctionActorManager:
|
|||
"actor_method_names": json.dumps(list(actor_method_names))
|
||||
}
|
||||
|
||||
check_oversized_function(actor_class_info["class"],
|
||||
actor_class_info["class_name"], "actor",
|
||||
self._worker)
|
||||
check_oversized_pickle(actor_class_info["class"],
|
||||
actor_class_info["class_name"], "actor",
|
||||
self._worker)
|
||||
|
||||
self._publish_actor_class_to_key(key, actor_class_info)
|
||||
# TODO(rkn): Currently we allow actor classes to be defined
|
||||
|
|
|
@ -592,40 +592,29 @@ def get_shared_memory_bytes():
|
|||
return shm_avail
|
||||
|
||||
|
||||
def check_oversized_function(pickled, name, obj_type, worker):
|
||||
"""Send a warning message if the pickled function is too large.
|
||||
def check_oversized_pickle(pickled, name, obj_type, worker):
|
||||
"""Send a warning message if the pickled object is too large.
|
||||
|
||||
Args:
|
||||
pickled: the pickled function.
|
||||
pickled: the pickled object.
|
||||
name: name of the pickled object.
|
||||
obj_type: type of the pickled object, can be 'function',
|
||||
'remote function', or 'actor'.
|
||||
'remote function', 'actor', or 'object'.
|
||||
worker: the worker used to send warning message.
|
||||
"""
|
||||
length = len(pickled)
|
||||
if length <= ray_constants.FUNCTION_SIZE_WARN_THRESHOLD:
|
||||
if length <= ray_constants.PICKLE_OBJECT_WARNING_SIZE:
|
||||
return
|
||||
elif length < ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD:
|
||||
warning_message = (
|
||||
"The {} {} is very large ({} MiB). "
|
||||
"Check that its definition is not implicitly capturing a large "
|
||||
"array or other object in scope. Tip: use ray.put() to put large "
|
||||
"objects in the Ray object store.").format(obj_type, name,
|
||||
length // (1024 * 1024))
|
||||
push_error_to_driver(
|
||||
worker,
|
||||
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
|
||||
"Warning: " + warning_message,
|
||||
job_id=worker.current_job_id)
|
||||
else:
|
||||
error = (
|
||||
"The {} {} is too large ({} MiB > FUNCTION_SIZE_ERROR_THRESHOLD={}"
|
||||
" MiB). Check that its definition is not implicitly capturing a "
|
||||
"large array or other object in scope. Tip: use ray.put() to "
|
||||
"put large objects in the Ray object store.").format(
|
||||
obj_type, name, length // (1024 * 1024),
|
||||
ray_constants.FUNCTION_SIZE_ERROR_THRESHOLD // (1024 * 1024))
|
||||
raise ValueError(error)
|
||||
warning_message = (
|
||||
"Warning: The {} {} has size {} when pickled. "
|
||||
"It will be stored in Redis, which could cause memory issues. "
|
||||
"This may mean that its definition uses a large array or other object."
|
||||
).format(obj_type, name, length)
|
||||
push_error_to_driver(
|
||||
worker,
|
||||
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR,
|
||||
warning_message,
|
||||
job_id=worker.current_job_id)
|
||||
|
||||
|
||||
def is_main_thread():
|
||||
|
|
|
@ -69,9 +69,7 @@ DEFAULT_ACTOR_METHOD_NUM_RETURN_VALS = 1
|
|||
|
||||
# If a remote function or actor (or some other export) has serialized size
|
||||
# greater than this quantity, print an warning.
|
||||
FUNCTION_SIZE_WARN_THRESHOLD = 10**7
|
||||
FUNCTION_SIZE_ERROR_THRESHOLD = env_integer("FUNCTION_SIZE_ERROR_THRESHOLD",
|
||||
(10**8))
|
||||
PICKLE_OBJECT_WARNING_SIZE = 10**7
|
||||
|
||||
# If remote functions with the same source are imported this many times, then
|
||||
# print a warning.
|
||||
|
|
|
@ -619,25 +619,6 @@ def test_args_named_and_star(ray_start_shared_local_modes):
|
|||
ray.get(remote_test_function.remote(local_method, actor_method))
|
||||
|
||||
|
||||
def test_oversized_function(ray_start_shared_local_modes):
|
||||
bar = np.zeros(100 * 1024 * 1024)
|
||||
|
||||
@ray.remote
|
||||
class Actor:
|
||||
def foo(self):
|
||||
return len(bar)
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return len(bar)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
f.remote()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
Actor.remote()
|
||||
|
||||
|
||||
def test_args_stars_after(ray_start_shared_local_modes):
|
||||
def star_args_after(a="hello", b="heo", *args, **kwargs):
|
||||
return a, b, args, kwargs
|
||||
|
|
|
@ -524,8 +524,7 @@ def test_export_large_objects(ray_start_regular, error_pubsub):
|
|||
p = error_pubsub
|
||||
import ray.ray_constants as ray_constants
|
||||
|
||||
large_object = np.zeros(
|
||||
2 * ray_constants.FUNCTION_SIZE_WARN_THRESHOLD, dtype=np.uint8)
|
||||
large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE)
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
|
|
|
@ -53,7 +53,7 @@ from ray.exceptions import (
|
|||
from ray._private.function_manager import FunctionActorManager
|
||||
from ray._private.ray_logging import setup_logger
|
||||
from ray._private.ray_logging import global_worker_stdstream_dispatcher
|
||||
from ray._private.utils import check_oversized_function
|
||||
from ray._private.utils import check_oversized_pickle
|
||||
from ray.util.inspect import is_cython
|
||||
from ray.experimental.internal_kv import _internal_kv_get, \
|
||||
_internal_kv_initialized
|
||||
|
@ -392,8 +392,8 @@ class Worker:
|
|||
# we don't need to export it again.
|
||||
return
|
||||
|
||||
check_oversized_function(pickled_function, function.__name__,
|
||||
"function", self)
|
||||
check_oversized_pickle(pickled_function, function.__name__,
|
||||
"function", self)
|
||||
|
||||
# Run the function on all workers.
|
||||
self.redis_client.hset(
|
||||
|
|
Loading…
Add table
Reference in a new issue