To make other system or internal project reuse ray deps bazel function, we need change this local accessing style to global accessing with ray-project namespace.
Co-authored-by: 林濯 <lingxuzn.zlx@antgroup.com>
C++ API need to call python and java worker, this pr support call python worker. Call python worker is similar with call c++ worker, need to pass PyFunction, PyActorClass and PyActorMethod.
## call python normal task
```python
#test_cross_language_invocation.py
import ray
@ray.remote
def py_return_input(v):
return v
```
c++ api call python function
```c++
auto py_obj1 = ray::Task(ray::PyFunction</*ReturnType*/int>{/*module_name=*/"test_cross_language_invocation",
/*function_name=*/"py_return_input"})
.Remote(42);
EXPECT_EQ(42, *py_obj1.Get());
```
The user need to fill python module name and function name, then pass arguments into the remote.
The user also need to assign the return type and arguments types of the python function, it used to do static safe checking and get result.
## call python actor task
```python
#test_cross_language_invocation.py
@ray.remote
class Counter(object):
def __init__(self, value):
self.value = int(value)
def increase(self, delta):
self.value += int(delta)
return str(self.value)
```
c++ api call python actor function
```c++
// Create python actor
auto py_actor_handle =
ray::Actor(ray::PyActorClass{/*module_name=*/"test_cross_language_invocation", /*class_name=*/"Counter"})
.Remote(1);
EXPECT_TRUE(!py_actor_handle.ID().empty());
// Call python actor task
auto py_actor_ret =
py_actor_handle.Task(ray::PyActorMethod</*ReturnType*/std::string>{/*actor_function_name=*/"increase"}).Remote(1);
EXPECT_EQ("2", *py_actor_ret.Get());
```
The user need to fill python module name and class name when creating python actor.
PyActorMethod only need to fill the function name.
It's also similar with calling c++ actor task, also has compile-time safe checking.
GCS pubsub uses long polling, so the subscriber waits instead of returning None from polling when there is no buffered log. It needs a different heuristic to decide if the driver is not keeping up with logs from the worker.
This PR add four tests for many tasks:
many short tasks send from the single node
many short tasks send from multiple nodes
many long tasks send from multiple nodes
many long tasks send from the single node
TODO: migrate many nodes actor tests to this one.
scheduling envelop should contain:
(tasks): scheduling_test_many_xx_tasks_yy_nodes
(actors):many_nodes_actor_test (to be combined with this one)
(shuffle): pipelined_ingestion_1500_gb_15_windows
(shuffle): dask_on_ray_1tb_sort
Making some minor fixes.
1. Update input `batch_size` to be global batch size. Introduce `worker_batch_size` so each iteration trains same global batch size.
2. Update dataset `size` calculation to only refer to the fraction of the data that is trained on each worker. This allows calculations (e.g. training progress, accuracy) to be correct.
3. Add `model.train()` for generality.
4. Remove `smoke-test` flag since it's not really being used.
- Tolerate GRPC deadline exceeded and transient failures in Python GCS AIO subscribers, which becomes consistent with Python GCS synchronous subscribers.
- Tolerate any exception in dashboard for subscribing to logs and error info, which becomes consistent with how dashboard handles GRPC errors for obtaining node stats.
This PR is a pre-work before actually fixing a thread-safety bug within shutdown.
It is doing
- Add better logging upon core worker shutdown.
- Improve document around core worker shutdown.
- Remove unnecessary pointer usage from periodical runner for clean destruction order.
- Remove unnecessary `WaitForShutdown` API and combine them into a single `Shutdown` API.
On machines without GPUs, this can run subprocesses that spew to
stderr. Then with log_to_driver=True, we get log spew from every
single raylet. To avoid this, disable the GPU usage check on
certain errors.
Resolves#14305
Co-authored-by: hauntsaninja <>
When a node is dead, reference table should remove locations for those objects on the node. Otherwise locality-aware scheduling will schedule tasks to the dead node.
This is a minimum viable product for Ray Autoscaler integration with Kuberay. It is not ready for prime time/general use, but should be enough for interested parties to get started (see the documentation in kuberay.md).
I updated this version compatibility table on the release branch but didn't update it on master. This is my mistake, the process is to make a PR to master and then cherry pick that commit to the release branch.
GCS, when running as an individual component, can cause other components to fail in case of crashes.
Here are two main cases covered in this patch:
1. monitor.py will raise an exception when disconnected from GCS.
2. When GCS becomes available later than other components, the missing KV of GCS address can cause other components to fail to start.
In our patch, we fixed these two issues as well as increased the timeout for redis connection which was too small.
Co-authored-by: Mingwei Tian <mwtian@anyscale.com>
An object can get created/pinned twice if the original worker fails mid-task, or when lineage reconstruction is enabled. This can cause inconsistencies in the LocalObjectManager if the second creation races with object spilling and/or object free. For example:
1. Object X get created, then is pending spill.
2. Object X is freed by original owner because it goes out of scope.
3. Task that created X gets re-executed due to failure.
4. Task recreates X, which can now get spilled again while the original copy is also being spilled/freed.
This PR better enforces the state machine for objects managed by the LocalObjectManager. An object can be either: pinned, pending spill, or spilled. If we receive a free message from the owner, we do not delete the object metadata until all shared-memory and spilled copies of the object are removed.
Use a separate event loop for pubsub work, to provide some isolation from other workload. There is no benchmark result but the downside, if there is any, should not be large.
The first migration of test into k8s. We are adopting a conservative approach (migrate slowly while we keep existing test suites). Once things are confirmed to be stable, we will migrate with more speed.
This fixes the previous problems from team column revert.
This has 2 additional changes;
alert handler receives the team argument, which was the root cause of breakage; https://github.com/ray-project/ray/pull/21289
Previously, tests without a team column were raising an exception, but I made the condition weaker (warning logs). I will eventually change it to raise an exception, but for smoother transition, we will log warning instead for a short time
Prior to this PR, sort, groupby, and aggregate defined separate types for extracting values from Dataset records. This was confusing since the user had to understand the differences between the different key types (which were basically exactly the same).
This PR defines a common key type: KeyFn, which is simply Union[None, str, Callable[[T], Any]]. This is used as sort(KeyFn...), aggregate(Agg(KeyFn)...), groupby(KeyFn).agg(Agg(KeyFn), ...).
It also unifies the error generation paths to a common _validate_key_fn utility. This also improves the errors generated when passing explicit AggregateFn classes, which previously failed in the workers if invalid.
When a session startup times out due to resources not being available, the session may still come up after that timeout. At that time the control script (e2e.py) is already terminated, so the session runs until the autosuspend limit is hit, incurring unnecessary costs. Instead, we should always trigger session termination on session timeout.
See #21458. Currently, Tune keeps its own list of alive node IPs, but this information is only updated every 10 seconds and is usually stale when a new node is added. Because of this, the first trial scheduled on this node is usually marked as failed. This PR adds a test confirming this behavior and gets rid of the unneeded code path.
Co-authored-by: Xiaowei Jiang <xwjiang2010@gmail.com>
Co-authored-by: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com>
In test_client_reconnect.py, each test case starts a Ray cluster via client server's default_connect_handler(). The Ray cluster shuts down implicitly when the start_middleman_server() ended and Python GC'es the client server. After turning on GCS pubsub, the time when client server is GC'ed changes. Sometimes the Ray cluster from a previous test cases stays alive after the next test case starts and shuts down later, leading to test failures due to lost data or crashes (race during worker shutdown, will be investigated separately).
This PR makes sure each test case shuts down its Ray cluster.
In python or C++, we can specify the bundle index as -1 to use any available bundle in the placement group. We should also enable it in Java to keep the API consistent across all languages.