In test_client_reconnect.py, each test case starts a Ray cluster via client server's default_connect_handler(). The Ray cluster shuts down implicitly when the start_middleman_server() ended and Python GC'es the client server. After turning on GCS pubsub, the time when client server is GC'ed changes. Sometimes the Ray cluster from a previous test cases stays alive after the next test case starts and shuts down later, leading to test failures due to lost data or crashes (race during worker shutdown, will be investigated separately).
This PR makes sure each test case shuts down its Ray cluster.
In python or C++, we can specify the bundle index as -1 to use any available bundle in the placement group. We should also enable it in Java to keep the API consistent across all languages.
This PR changes the enum value `ActorLifetime.DEFAULT` to `ActorLifetime.NON_DETACHED`. In our release versions, `ActorLifetime` was not introduced <= 1.9.2
Co-authored-by: Qing Wang <jovany.wq@antgroup.com>
This PR introduces statically defining ConcurrencyGroup APIs in Java.
We introduce 2 APIs:
1. Introducing `@DefConcurrencyGroup` annotation for an actor class to define a concurrency group statically.
2. Introducing `@UseConcurrencyGroup` annotation for actor methods to define the concurrency group to be used in the method.
Examples are below:
```java
@DefConcurrencyGroup(name = "io", maxConcurrency = 2)
@DefConcurrencyGroup(name = "compute", maxConcurrency = 4)
private static class MyActor {
@UseConcurrencyGroup(name = "io")
public long f1() { }
@UseConcurrencyGroup(name = "io")
public long f2() { }
@UseConcurrencyGroup(name = "compute")
public long f3(int a, int b) { }
@UseConcurrencyGroup(name = "compute")
public long f4() { }
}
ActorHandle<> myActor = Ray.actor(MyActor::new).remote();
myActor.task(MyActor::f1).remote();
myActor.task(MyActor::f2).remote();
myActor.task(MyActor::f3).remote();
myActor.task(MyActor::f4).remote();
```
`MyActor` has 3 concurrency groups: `io` with 2 concurrency, `compute` with 4 concurrency and `default` with 1 concurrency.
f1 and f2 will be executed in `io`, f3 and f4 will be executed in `compute`.
This PR fixed and reenabled tests in HA mode
- //python/ray/tests:test_healthcheck
- //python/ray/tests:test_autoscaler_drain_node_api
- //python/ray/tests:test_ray_debugger
Previously, ref arg is handled wrongly, serializing the object ref, instead of RayObject to be passed as args buffer to the user function.
That's because CoreWorker is the component responsible for ensuring that all ObjectReferences are resolved and serialized into `RayObject`s at the time of the `task_execution_callback` invocation, not any component downstream of the callback.
This resulted in the following error for large objects which are not turned into `TaskArg::value` due to being over 100KB.
```
C++ exception with description "Invalid: invalid arguments: std::bad_cast" thrown in the test body.
```
This was not caught due to lack of testing for large objects, which has now been added.
When cleanup the function table, we use the prefix to delete the data. But right now prefix contains binary data and it won't work well with redis keys/scan which use `*` in the pattern.
For example, when job id increases to 41, it'll delete the keys for job 1 which leads to the new worker failing to import the function.
This PR uses hex of job id to avoid this.
Previous changes failed because a) permission errors b) unzip being unavailable at remote nodes. Instead we are using tar gzip archives now.
This reverts commit 42bcab27e8.
This PR adds pandas block format support by implementing `PandasRow`, `PandasBlockBuilder`, `PandasBlockAccessor`.
Note that `sort_and_partition`, `combine`, `merge_sorted_blocks`, `aggregate_combined_blocks` in `PandasBlockAccessor` redirects to arrow block format implementation for now. They'll be implemented in a later PR.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Why are these changes needed?
fix dlmalloc allocate bug, details in here #21310
* fix dlmalloc bug
* make lint happy
* make lint happy
* fix by comment
* use _check_spilled_mb
* add cpp UT
If a task is re-executed on failure, it will deterministically generate the same IDs for any ray.put or .remote task calls because it uses its own task ID as a seed. This can cause problems if those objects conflict with previous versions that still exist in the cluster.
This PR adds the execution attempt number to the current task ID seed. This avoids collisions with any ObjectIDs generated by the previous execution attempt of the task.
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com>
In Ray, functions are exported to the function table during runtime. But it's not cleaned up after use. This PR garbage collects the resource when there is no job/detached actor referencing the resource.
Ideally, we should move the function table imports/exports feature to core, so gcs function manager is introduced, and currently, it's for reference counting only.
External Redis should still be supported with GCS bootstrapping, to avoid breaking users.
In GCS mode, some logic are removed for external Redis:
- Printing external Redis addresses to terminal: hard to implement across `ray start`, `ray.init()` and Ray cluster util.
- Starting local Redis if external Redis is unavailable: failing loudly here seems more appropriate.
Also, re-enable a few tests which restarts GCS in GCS bootstrapping mode, by using external Redis for KV storage.
Currently we install OpenSSH on the fly in fake multinode docker testing. Instead we can speed testing up a fair bit by building a Docker image which includes OpenSSH first and then run tests with this image.
In some cases, the task that's added to the `running_tasks` is never removed and introduces wait time for all the following tasks due to worker cap. One such case is lease request cancellation: the request is cancelled after `PopWorker` is called and the task is never removed from `running_tasks`.