This PR adds 2 more states into TaskStatus
enum TaskStatus {
// The task is scheduled properly and waiting for execution.
// It includes time to deliver the task to the remote worker + queueing time
// from the execution side.
WAITING_FOR_EXECUTION = 5;
// The task that is running.
RUNNING = 6;
}
## Why are these changes needed?
<!-- Please give a short summary of the change and the problem this solves. -->
This PR fixes the path to resubscribe to GCS when GCS restarts.
When GCS restarts, it'll lose all subscription information since everything is stored in memory. Then in the runtime, we need to tell GCS what's currently being subscribed.
The previous method:
- We'll have a thread in core worker/raylet to check whether the GCS restarted or not.
- If it restarted, we'll send resubscribe request to GCS.
However, this is not working in these cases:
- GCS restarts happen so fast so the checker in raylet/core worker missed them.
- GCS doesn't restart, but just being lag due to network issues then, the resubscribe is not necessary.
Actually, GCS knows when a resubscribe is needed: when it restarts. So the PR here is to send a resubscribe request from GCS -> Raylet and Raylet will do the resubscription.
There are two parts for this to work:
- [x] raylet send resubscription
- [ ] raylet ask core worker to send resubscription
Providing easy-access datasets is table stakes for a good Getting Started UX, but even with good in-package data, it can be difficult to make these paths accessible to the user. This PR adds an "example://" protocol that will resolve passed paths directly to our canned in-package example data.
Currently, when an actor has `max_restarts` > 0 and has crashed, the actor will enter RESTARTING state and then ALIVE. Imagine this scenario: an online service provides HTTP service and the proxy actor receives requests, forwards them to worker actors, and replies to clients with the execution results from worker actors.
```
-> Worker A (actor)
/
/
HTTP requests -------> Proxy (actor with HTTP server) ---> Worker B (actor)
\
\
-> ...
```
For each HTTP request, the proxy picks one worker (e.g. worker A) based on some algorithm, sends the request to it, and calls `ray.get()` to wait for the result. If for some reason the picked worker crashed, Ray will restart the actor, and `ray.get()` will throw an error. The proxy may pick another worker (e.g. worker B) and re-send the request to it. This is OK.
But new requests keep coming. The proxy may pick worker A again. But because worker A is still in RESTARTING state, it's not ready to serve requests. `ray.get()` on subsequent requests sent to worker A will hang until worker A is back online (ALIVE state). The proxy won't be able to reschedule these requests to another worker because currently there's no way to know if worker A is alive or not before sending a request. We can't say worker A is not alive just based on whether `ray.get()` hangs either.
To solve this issue, we change the semantics of `max_task_retries`.
* When max_task_retries is 0 (which is the default value), if the callee actor is in the RESTARTING state, subsequently submitted tasks will fail immediately with a RayActorError. Users can catch the RayActorError and implement their own fallback strategies to improve service availability and mitigate service outages.
* When max_task_retries is not 0, subsequently submitted tasks will be queued on the caller side and we only send them to the callee when the callee actor is back to the ALIVE state.
TODO
- [x] Add test cases.
- [ ] Update docs.
- [x] API change review.
This PR adds more example data for ongoing feature guide work. In addition to adding the new datasets, this also puts all example data under examples/data in order to separate it from the example code.
To facilitate easy demo/documentation/testing with realistic, small-sized yet ML-familiar data. Have it as a source file with code will make it self-contained, i.e. after user "pip install" Ray, they are all set to run it.
IRIS is a great fit: super classic ML dataset, simple schema, only 150 rows.
Aiming to:
1. addressing the bug about concurrency group, see #19593
2. improving the stability of the ray call latency perf in online applications.
we're proposing using async post instead of `PostBlocking` in threadpool.
Note that since we have already had back pressure in the caller side, I believe this change is safe to merge and it doesn't break any behavior.
This PR uses the job id and group name as the prefix for storing meta information, aiming to provide the isolate ability for different jobs and different groups.
Before this PR, we can't use 2 groups in 1 Ray cluster, and we can not rerun a collective job once the last one is failed at initializing.
Some tests relying on AutoScalingCluster are flaky because ray.init() after AutoscalingCluster.start() is not guaranteed to work. Sometimes, it cannot find any running ray instances.
This was a holdover from when local resources for URIs were deleted directly from the runtime env agent, and the URI name itself needed to store the information of which plugin it corresponded to so the appropriate plugin's `delete()` function could be called. After the URI reference refactor, I don't think this is needed anymore.
Adds `get_dataframe()` method to pass through experiment analysis to result grid. Also fixes config dictionary returns which previously did not flatten the dict (even though the docstring suggested it would)
This PR supports setting the jars for an actor in Ray API. The API looks like:
```java
class A {
public boolean findClass(String className) {
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
return false;
}
return true;
}
}
RuntimeEnv runtimeEnv = new RuntimeEnv.Builder()
.addJars(ImmutableList.of("https://github.com/ray-project/test_packages/raw/main/raw_resources/java-1.0-SNAPSHOT.jar"))
.build();
ActorHandle<A> actor1 = Ray.actor(A::new).setRuntimeEnv(runtimeEnv).remote();
boolean ret = actor1.task(A::findClass, "io.testpackages.Foo").remote().get();
System.out.println(ret); // true
```
This PR support reconnection of the GCS client in gRPC channel layer. Previously this is implemented in the application layer:
- Health check is in the application layer by starting a new channel.
- Monitor the GCS address change and do resubscribe.
- Always retry the failed request and do reconnection in case of a failure.
However, there are several issues with this approach:
- We need service to discover for GCS address change.
- Monitor is too heavy since it always creates a channel.
- Reconnection is a blocking call that prevents the code from running.
This new approach moves the reconnection to gRPC layer directly to fix these issues.
- DNS name resolution is done by gRPC so we don't need to write this.
- Health check is done by checking the channel state.
- Queue the failed call and retry once GCS is up so that it's not a blocking call.
Under heap memory pressure, the raylet is often killed by the OS OOM killer. This is bad because it can cause whole system crashes and it is difficult to find the error afterwards. This PR adjusts the OOM score for any workers that the raylet spawns so that the OOM killer will hopefully prioritize killing those instead of the raylet.
Adds support for chunking large schedule calls. Needed to support ray.remote calls with more than 2GiB of arguments.
Deprecates the args and kwargs fields of ClientTask and replaces them with a data field that contains a tuple of the serialized args and kwargs fields, which can be chunked and reassembled more easily using the same logic as PutRequest's.
A simple way to redirect remote job output to a custom file is extremely useful.
Especially if it does not require any code changes for the user, and especially if it captures all output and not limited only to python logging.
When the PR will be merged, the user will be able to do the following (for example):
ray submit some_cluster_config.yaml example_runnable_script.py --screen --extra-screen-args "-Logfile /gpfs/usr/someone/ray/output_log.txt"
which will, in addition to creating the screen session, also output (continuously) to custom text file.
It allows additional flexibility, for example, the user will be able to choose custom screen session name etc.
Closes#24514 by filtering out pods with metadata.deletionTimestamp set in the KuberayNodeProvider.
Adds some e2e test logic to confirm reasonable behavior when Ray worker pod termination hangs.
Also, a bit of code cleanup -- defining constants, adding to doc strings, etc.
HEBOSearch algorithm currently fails if the config search space contains a categorical parameter where each category is an iterable.
For instance, choosing the hidden layers of a NN:
` hyperparam_search_space = {'hidden_sizes': tune.choice([[512, 256, 128], [1024, 512, 256]])}`
This is due to the creation of the Pandas DataFrame with HEBO suggested parameters, without explicitly telling Pandas that the hyper-parameter suggestion is a single row of data while the index is being defined as a single row. This results in an exception such as "ValueError: Length of values (3) does not match length of index (1)".
Co-authored-by: Makan Arastuie <makan.arastuie@seagate.com>