Make it more convenient to develop ray.data by setting RAY_EXPERIMENTAL_DATA_API=1 (#16685)

* make it convenient to import ray.data

* update

* Update python/ray/experimental/data/read_api.py

Co-authored-by: Alex Wu <itswu.alex@gmail.com>

Co-authored-by: Alex Wu <itswu.alex@gmail.com>
This commit is contained in:
Eric Liang 2021-06-26 09:17:30 -07:00 committed by GitHub
parent 6bfa97eed7
commit aa882ed52d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 3 deletions

View file

@ -1,3 +1,4 @@
import os
import logging
logger = logging.getLogger(__name__)
@ -150,6 +151,10 @@ __all__ += [
"PlacementGroupID",
]
if "RAY_EXPERIMENTAL_DATA_API" in os.environ:
from ray.experimental import data
__all__.append(data)
# Remove modules from top-level ray
def _ray_user_setup_function():
@ -170,5 +175,6 @@ def _ray_user_setup_function():
_ray_user_setup_function()
del os
del logging
del _ray_user_setup_function

View file

@ -196,13 +196,16 @@ class Dataset(Generic[T]):
return Dataset(new_blocks)
def sort(self,
key: Union[str, List[str], Callable[[T], Any]],
key: Union[None, str, List[str], Callable[[T], Any]],
descending: bool = False) -> "Dataset[T]":
"""Sort the dataset by the specified key columns or key function.
This is a blocking operation.
Examples:
# Sort using the entire record as the key.
>>> ds.sort()
# Sort by a single column.
>>> ds.sort("field1")
@ -216,8 +219,8 @@ class Dataset(Generic[T]):
Args:
key: Either a single Arrow column name, a list of Arrow column
names, or a function that returns a sortable key given each
record as an input.
names, a function that returns a sortable key given each
record as an input, or None to sort by the entire record.
descending: Whether to sort in descending order.
Returns:

View file

@ -14,6 +14,16 @@ from ray.experimental.data.impl.block import ObjectRef, ListBlock, Block
from ray.experimental.data.impl.arrow_block import ArrowBlock, ArrowRow
def autoinit_ray(f):
def wrapped(*a, **kw):
if not ray.is_initialized():
ray.client.connect()
return f(*a, **kw)
return wrapped
@autoinit_ray
def from_items(items: List[Any], parallelism: int = 200) -> Dataset[Any]:
"""Create a dataset from a list of local Python objects.
@ -41,6 +51,7 @@ def from_items(items: List[Any], parallelism: int = 200) -> Dataset[Any]:
return Dataset(blocks)
@autoinit_ray
def range(n: int, parallelism: int = 200) -> Dataset[int]:
"""Create a dataset from a range of integers [0..n).
@ -72,6 +83,7 @@ def range(n: int, parallelism: int = 200) -> Dataset[int]:
return Dataset(blocks)
@autoinit_ray
def range_arrow(n: int, parallelism: int = 200) -> Dataset[ArrowRow]:
"""Create an Arrow dataset from a range of integers [0..n).
@ -108,6 +120,7 @@ def range_arrow(n: int, parallelism: int = 200) -> Dataset[ArrowRow]:
return Dataset(blocks)
@autoinit_ray
def read_parquet(paths: Union[str, List[str]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
columns: Optional[List[str]] = None,
@ -162,6 +175,7 @@ def read_parquet(paths: Union[str, List[str]],
return Dataset([gen_read.remote(ps) for ps in nonempty_tasks])
@autoinit_ray
def read_json(paths: Union[str, List[str]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = 200,
@ -187,6 +201,7 @@ def read_json(paths: Union[str, List[str]],
raise NotImplementedError # P0
@autoinit_ray
def read_csv(paths: Union[str, List[str]],
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
parallelism: int = 200,
@ -212,6 +227,7 @@ def read_csv(paths: Union[str, List[str]],
raise NotImplementedError # P0
@autoinit_ray
def read_binary_files(
paths: Union[str, List[str]],
include_paths: bool = False,