Signed-off-by: Yi Cheng <chengyidna@gmail.com>
## Why are these changes needed?
When actor died, it'll send notification to core workers. Right now, sometimes, core worker will queue the task waiting for actor death info and pop it up for better usability. But in async cases, this is going to cause issues.
The callback might submit tasks which require holding the lock. But it's already being held. This is going to cause a deadlock.
This PR fixed this by moving the failure part out of the lock.
Ray 2.0 API deprecation for:
ray.remote(): placement_group
ray.remote(): placement_group_bundle_index
ray.remote(): placement_group_capture_child_tasks
ray.get_dashboard_url()
ray.get_resource_ids()
ray.disconnect()
ray.connect()
ray.util.ActorGroup
ray.util.ActorPool
Add get_xx_id() to return hex (rather than object), and then deprecate the xx_id() (which returns Cython object): the xx here can be node, task etc.
ray start: --plasma-store-socket-name
ray start: --raylet-socket-name
Why are these changes needed?
Introduce a stable version of split with hints with a stable equalizing algorithm:
use the greedy algorithm to generate the initial unbalanced splits.
for each splits, first shave them so the number for rows are below the target_size
based on how many rows needed for each split, do a one time split_at_index to the left over blocks.
merge the shaved splits with the leftover splits.
The guarantee of this algorithm is we at most need to split O(split) number of blocks.
Why are these changes needed?
Consumers (e.g. Train) may expect generated batches to be of the same size. Prior to this change, the default behavior would be for each batch to be one block, which may be of different sizes.
Changes
Set default batch_size to 256. This was chosen to be a sensible default for training workloads, which is intentionally different from the existing default batch_size value for Dataset.map_batches.
Update docs for Dataset.iter_batches, Dataset.map_batches, and DatasetPipeline.iter_batches to be consistent.
Updated tests and examples to explicitly pass in batch_size=None as these tests were intentionally testing block iteration, and there are other tests that test explicit batch sizes.
ray.init() will currently start a new Ray instance even if one is already existing, which is very confusing if you are a new user trying to go from local development to a cluster. This PR changes it so that, when no address is specified, we first try to find an existing Ray cluster that was created through `ray start`. If none is found, we will start a new one.
This makes two changes to the ray.init() resolution order:
1. When `ray start` is called, the started cluster address was already written to a file called `/tmp/ray/ray_current_cluster`. For ray.init() and ray.init(address="auto"), we will first check this local file for an existing cluster address. The file is deleted on `ray stop`. If the file is empty, autodetect any running cluster (legacy behavior) if address="auto", or we will start a new local Ray instance if address=None.
2. When ray.init(address="local") is called, we will create a new local Ray instance, even if one is already existing. This behavior seems to be necessary mainly for `ray.client` use cases.
This also surfaces the logs about which Ray instance we are connecting to. Previously these were hidden because we didn't set up the log until after connecting to Ray. So now Ray will log one of the following messages during ray.init:
```
(Connecting to existing Ray cluster at address: <IP>...)
...connection...
(Started a local Ray cluster.| Connected to Ray Cluster.)( View the dashboard at <URL>)
```
Note that this changes the dashboard URL to be printed with `ray.init()` instead of when the dashboard is first started.
Co-authored-by: Eric Liang <ekhliang@gmail.com>
This PR tries to automatically cast tensor columns to our TensorArray extension type when building Pandas blocks, logging a warning and falling back to the opaque object-typed column if the cast fails. This should allow users to remain mostly tensor extension agnostic.
TensorArray now eagerly validates the underlying tensor data, raising an error if e.g. the underlying ndarrays have heterogeneous shapes; previously, TensorArray wouldn't validate this on construction and would instead let failures happen downstream. This means that our internal TensorArray use needs to follow a try-except pattern, falling back to a plain NumPy object column.
This PR adds .iter_torch_batches() and .iter_tf_batches() convenience APIs, which takes care of ML framework tensor conversion, the narrow tensor waste for the .iter_batches() call ("numpy" format), and unifies batch formats around two options: a single tensor for simple/pure-tensor/single-column datasets, and a dictionary of tensors for multi-column datasets.
We don't have a way to specify resource requirements with the Tuner() API. This PR introduces tune.with_resources() to attach a resource request to class and function trainables. In class trainables, it will override potential existing default resource requests.
Signed-off-by: Kai Fricke <kai@anyscale.com>
Having the indicator about who's running the stage and who created a blocklist will enable the eager memory releasing.
This is an alternative with better abstraction to https://github.com/ray-project/ray/pull/26196.
Note: this doesn't work for Dataset.split() yet, will do in a followup PR.