Adds an API to the REST server, the SDK, and the CLI for listing all jobs that have been submitted, along with their information.
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
DatasetPipeline execution is coordinated by a pool of actors and optionally the driver process. To recover from failures with lineage reconstruction, we need to keep these actors alive as long as the driver is alive. Currently, they are spread randomly throughout the cluster, so they can be killed during a node failure.
This PR pins the actors to the same node as the driver so that they will survive any other node failures. It's also okay if the driver node dies, since the driver itself will also die.
Don't re-use task workers for actors, since those workers may own objects that will be lost on actor exit.
This adds a slight performance penalty for actor startup.
This PR fixes some minor bugs in `to_dict` and `from_dict` for the runtime env protobuf and adds a test to cover this codepath. The test checks that `to_dict` and `from_dict` are inverses. This PR contains all fixes required to make the test pass.
This change is needed for object fusing to see performance increases on HDD. Currently, smaller object writes are slow even with fusing since the writes are not buffered (negating the point of fusing). Benchmarks show that while the default is sufficient for fast SSDs, on a slow HDD, increasing the buffer size reduces write times by several magnitudes.
### Performance Changes
A microbenchmark where 500KB objects were produced (then spilled) and consumed to observe changes in object fusing/spilling.
| Run | Produce (s) | Consume (s) | Total (s) |
| -- | -- | -- | -- |
| Baseline (original) | 347.332281 | 355.611272 | 705.560750 |
| Baseline (w/ fix) | 181.815852 | 347.692850 | 532.847759 |
| No fusing (original) | 453.574554 | 525.047998 | 981.620108 |
| No fusing (w/ fix) | 452.614848| 519.787698 | 975.412639 |
The baseline runs should be notably faster due to object fusing reducing I/O requests. With the fix, Ray's defaults allow this microbenchmark to have a 48% time reduction with negligible impact on runtime when fusing is disabled.
See [this followup](https://github.com/ray-project/ray/pull/22618#issuecomment-1054838715) for information on the differences between SSD and HDD performance with different buffer sizes.
Co-authored-by: Ubuntu <ubuntu@ip-172-31-54-240.us-west-2.compute.internal>
This is useful in combining multiple applied groups produced by groupby().map_groups() into a single one. For example, builder = BlockBuilder.for_block(type(batch)), and then for each applied group, builder.add_block(applied_group).
The AMI's for ray.head.default and ray.worker.default in defaults.yaml supersede the default AMI for the region (defaults get merged in before _check_ami is called, causes problems if region isn't us-west-2). Removes the default AMI from defaults.yaml, and aborts if user doesn't specify an AMI in a region without a default.
A follow up change of #22348
example is not up to date and we can not bring up the cluster due to missing configmap. Autoscaler is able to convert CR to autoscaler config so we don't need configmap anymore.
I recently realized that during a runtime_env creation process, a plugin/manager that is very slow to setup may block the creation of other runtime_env, so I make plugin/manager setup run in threads.
[The refactor of `PipManager`](https://github.com/ray-project/ray/pull/22381) is about to be completed, so I ignore it in this PR.
Combine `ParsedRuntimeEnv` and `RuntimeEnv` into `ray.runtime.RuntimeEnv`, details: #21495
- The `new RuntimeEnv` includes all external interfaces of `ParsedRuntimeEnv` and `old RuntimeEnv`.
- The `new RuntimeEnv` will be exposed directly to the user.
- example:
```python
runtime_env = ray.runtime_env.RuntimeEnv(working_dir="s3://workding_dir.zip",
pip=["requests"],
java_jars=["s3://jar1.zip"],
java_jvm_options=["-Dxxx=xxx"])
```
In Envs with K8S and enabled SELinux there is a bug:
"/proc/nvidia/" is not allowed to mount in container
So, i made a rework for GPU detection based on GPutil package.
## Checks
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [x] Release tests
Co-authored-by: Mopga <a14415641@cab-wsm-0010669.sigma.sbrf.ru>
Co-authored-by: Julius <juliustfrost@gmail.com>
- Moved all `Deployment` instance creation to `DeploymentNode` level with only relevant info passed into it from `generate.py`. This abstraction makes more sense and less leaky.
- In `DeploymentNode`, we leverage ray core DAG's `_PyObjScanner` to find and replace only Deployment nodes init args & kwargs to deployment handle, which is only specific to `Deployment` instance, but not `DeploymentNode` itself. However this is the simplest and most robust way to handle nested args at `DAGNode` level.
- This implementation lives in ray core DAGNode level so we don't need to expose `_PyObjScanner` directly.
- Added serve pipeline tests to BUILD CI.
Whenever we spill, we try to spill all spillable objects. We also try to fuse small objects together to reduce total IOPS. If there aren't enough objects in the object store to meet the fusion threshold, we spill the objects anyway to avoid liveness issues.
However, the current logic always spills once we reach the end of the spillable objects or once we've reached the fusion threshold. This can produce lots of unfused objects if they are created concurrently with the spill.
This PR changes the spill logic: once we reach the end of the spillable objects, if the last batch of spilled objects is under the fusion threshold, we'll only spill it if we don't have other spills pending too. This gives the pending spills time to finish, and then we can re-evaluate whether it's necessary to spill the remaining objects. Liveness is also preserved.
This PR fixes initializations artifacts related to the load metric summary and autoscaler summary.
Load metrics summaries are defined to be Falsey if the autoscaler has never received a resource message from the GCS.
We skip most autoscaler actions if load metrics is Falsey, because it doesn't makes sense to autoscale without load metrics. This also allows us to execute the TODO here: #22348 (comment) and remove the time.wait().
As for the autoscaler summary, it is possible for autoscaler.summary() to error outside of an autoscaler update in this scenario:
The very first call to NodeProvider.non_terminated_nodes fails, self.non_terminated_nodes remains a None object, and autoscaler.summary() fails trying to get an attribute of this None object.
The result is a confusing error message, as in #22515. This PR fixes that.
Closes#22515
- Added backbone of ray dag -> serve dag transformation and deployment extraction.
- Added util functions for deployment unique name generation .. ray_actor_options, replacement of DeploymentNode with deployment handle, etc.
Soft restarts don't work for tensorflow since there is still some leftover communication state in the actors which may lead to undefined behavior, such as causing training to hang.
Instead, this PR changes the failure handling for tensorflow to match torch and horovod, and recreates all the workers in case of failure. Also adds a test to check if fault tolerance works correctly for an actual tensorflow example. When testing locally, the test failed before the change, but passes after.
This change adds the GET, PUT, and DELETE commands for Serve’s REST API. The dashboard receives these commands and issues corresponding requests to the Serve controller.
In order to initialize runtime env concurrently, this PR makes pip runtime env asynchronous. It includes,
- [x] New `check_output_cmd` in runtime env utils.
- [x] Async PipProcessor.
- [x] The `asynccontextmanager` from `https://github.com/python-trio/async_generator` for Python 3.6
- [x] Remove pip runtime env lock.
- [x] Disable pip cache.
Co-authored-by: 刘宝 <po.lb@antfin.com>
This PR enables stage fusion for dataset pipelines. This also requires:
1. Removing the num_cpus=0.5 default for the read stage, to enable fusion of the read stage.
2. Removing spread_resource_prefix (not supported for now).
We should just encourage people to use the existing `get_runtime_context` API instead of introducing a new one here. Just removing the docs for now while we discuss this.
Why are these changes needed?
Data from PutRequests is chunked into 64MiB messages over the datastream, to avoid the 2GiB message size limit from gRPC. This will allow users to transfer objects larger than 2GiB over the network.
Proto changes
Put requests now have fields for chunk_id to identify which chunk data belongs to, total_chunks to identify the total number of chunks in the object, and total_size for total size in bytes of the object (useful for raising warnings).
PutObject is still unary-unary. The dataservicer handles reassembling the chunks before passing the result to the underlying RayletServicer.
Dataclient changes
If a put request is inserted into the request queue, self._requests will chunk it lazily. Doing this lazily is important since inserting all of the chunks onto the request queue immediately would double the amount of memory needed to handle a large request. This also guarantees that the chunks of a given putrequest will be contiguous
Dataservicer changes
The dataservicer now maintains some state to track received chunks. Once all chunks for a putrequest are received, the combined chunks are passed to the raylet servicer.