Adds some metrics useful for object-intensive workloads:
Per raylet/object manager:
Add num bytes pending restore to spill manager
Add num requests cumulative to PullManager
Num bytes pushed/pulled from other nodes cumulative
Histogram for request latencies in PullManager:
total life time of request, from start to cancel
request satisfaction time, from start to object local
pull time, from object activation to object local
Per-node disk read/write speed, IOPS
* Make default memory 1
* Add test to validate that ReplicaConfig's default memory cannot be lower than minimum
* Add a new option to memory_omitted_options
* Update if branch in test_replica_config_default_memory_minimum
* Make memory default value None
We use tarfile to pack/unpack directories in several locations. Instead of using temporary files, we can just use io.BytesIO to avoid unnecessary disk writes.
Note that this functionality is present in 3 different modules - in Ray (AIR), in the release test package, and in a specific release test. The implementations should live in the three modules independently, so we don't add a common utility for this (e.g. the ray_release package should be independent of the Ray package).
Current logic looks broken, as reported in #22954 (comment)
I fixed the logic as best as I can, and tested it on Anyscale platform with GPU. No process info was reported from gpustat. But the logic works under this case.
"ResourceRequest" now uses 2 containers: a vector for predefined resources, and a map for custom resources.
This was intended to be a perf optimization. However, in practice, this makes the code more complex, and, moreover, prevents optimizations for some methods (e.g., "ResourceIds", "Size").
This PR removes the vector and makes ResourceRequest use only one map for all resources. Also, "ResourceIds" now returns a "boost:range" to allow iterating resource IDs without having to construct temporary sets.
microbenchmark shows a slight perf improvement.
last nightly: `placement group create/removal per second 837.76 +- 16.68`.
this PR: `placement group create/removal per second 895.76 +- 16.99`.
There are a few changes:
1. Between runner thread and main thread: The same stacktrace is raised in `_report_thread_runner_error` in main thread. So we could spare this raise in runner thread.
2. Between function runner and Tune driver: Do not wrap RayTaskError in TuneError.
3. Within Tune driver code: Introduces a per errored trial error.pkl and uses that to populate ResultGrid.
Plus some cleanups to facilitate propagating exception in runner and executor code.
Final stacktrace looks like: (omitted)
In Tune, we are capturing `traceback.format_exc` at the time the exception is caught and just pass the string around. This PR slightly changes that only in the case of when RayTaskError is raised, and we pass that object around.
It may be worthwhile to settle down on a practice of error handling in Tune in general.
I am also curious to learn how other ray library does that and any good lessons to learn.
In particular, we should watch out for memory leaking in exception handling. Not sure if it is still a problem in python 3, but here are some articles I came across for reference
https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
This PR fixes the issue of diverging documentation between Ray Docs and ecosystem library readmes which live in separate repos (eg. xgboost_ray). This is achieved by adding an extra step before the docs build process starts that downloads the readmes of specified ecosystem libraries from their GitHub repositories. The files are then preprocessed by a very simple parser to allow for differences between GitHub and Docs markdowns.
In summary, this makes the markdown files in ecosystem library repositories single sources of truth and removes the need to manually keep the doc pages up to date, all the while allowing for differences between what's rendered on GitHub and in the Docs.
See ray-project/xgboost_ray#204 & https://ray--23505.org.readthedocs.build/en/23505/ray-more-libs/xgboost-ray.html for an example.
Needs ray-project/xgboost_ray#204 and ray-project/lightgbm_ray#30 to be merged first.
As discussed in #23424, the synch=True mode of PopulationBasedTrainingScheduler is (1) not compatible with burn_in_period and (2) causes the presence of TERMINATED trials to hang PAUSED trials indefinitely.
This change addresses (1) by setting the initial _next_perturbaton_sync to the max of burn_in_period and perturbation_interval in the constructor and (2) by checking only whether live trials have reached the _next_perturbation_sync before resuming PAUSED trials.
As we (@scv119 @iycheng @raulchen @Chong-Li @WangTaoTheTonic ) discussed offline, the GcsResourceScheduler on the GCS side should be unified to ClusterResourceScheduler.
There is already a big PR( #23268 ) to do this, but in order to make review easy, I will split it to two or mall small PRs.
This is [3/n]:
Move the implementation of all policies from gcs_resource_scheduler to bundle_scheduling_plocy
Delete gcs_resource_scheduler
Refactor gcs_resource_scheduler_test to cluster_resource_scheduler_2_test
BTW: The interface inside ISchedulingPolicy should be refactor in another PR, see the discussion #23323 (comment)
To be clear:
scorer related codes are moved out from gcs_resoruce_scheduler to scorer.h/.cc and no logic changes.
Policy related codes are moved out from gcs_resoruce_scheduler to bundle_scheduling_policy.h/.cc, and a small part of the logic in "GcsResourceScheduler::Schedule" is distributed into each policy.
Some codes inside gcs_placement_group_scheduler.h/.cc are changed to adapt to new data structure (SchedulingResult and SchedulingContext)
Support filtering tests by test attr regex filters. Multiple filters can be specified with one line for each filter. The format is attr:regex (e.g. team:serve)
This PR addresses recent failures in the tune cloud tests.
In particular, this PR changes the following:
The trial runner will now wait for potential previous syncs to finish before syncing once more if force=True is supplied. This is to make sure that the final experiment checkpoints exist in the most recent version on remote storage. This likely fixes some flakiness in the tests.
We switched to new cloud buckets that don't interfere with other tests (and are less likely to be garbage collected)
We're now using dated subdirectories in the cloud buckets so that we don't interfere if two tests are run in parallel. Objects are cleaned up afterwards. The buckets are configured to remove objects after 30 days.
Lastly, we fix an issue in the cloud tests where the RELEASE_TEST_OUTPUT file was unavailable when run in Ray client mode (as e.g. in kubernetes).
Local release test runs succeeded.
https://buildkite.com/ray-project/release-tests-branch/builds/189https://buildkite.com/ray-project/release-tests-branch/builds/191
## Why are these changes needed?
This PR refactor the resource syncer to decouple it from GCS and raylet. GCS and raylet will use the same module to sync data. The integration will happen in the next PR.
There are several new introduced components:
* RaySyncer: the place where remote and local information sits. It's a coordinator layer.
* NodeState: keeps track of the local status, similar to NodeSyncConnection.
* NodeSyncConnection: keeps track of the sending and receiving information and make sure not sending the information the remote node knows.
The core protocol is that each node will send {what it has} - {what the target has} to the target.
For example, think about node A <-> B. A will send all A has exclude what B has to B.
Whenever when there is new information (from NodeState or NodeSyncConnection), it'll be passed to RaySyncer broadcast message to broadcast.
NodeSyncConnection is for the communication layer. It has two implementations Client and Server:
* Server => Client: client will send a long-polling request and server will response every 100ms if there is data to be sent.
* Client => Server: client will check every 100ms to see whether there is new data to be sent. If there is, just use RPC call to send the data.
Here is one example:
```mermaid
flowchart LR;
A-->B;
B-->C;
B-->D;
```
It means A initialize the connection to B and B initialize the connections to C and D
Now C generate a message M:
1. [C] RaySyncer check whether there is new message generated in C and get M
2. [C] RaySyncer will push M to NodeSyncConnection in local component (B)
3. [C] ServerSyncConnection will wait until B send a long polling and send the data to B
4. [B] B received the message from C and push it to local sync connection (C, A, D)
5. [B] ClientSyncConnection of C will not push it to its local queue since it's received by this channel.
6. [B] ClientSyncConnection of D will send this message to D
7. [B] ServerSyncConnection of A will be used to send this message to A (long-polling here)
8. [B] B will update NodeState (local component) with this message M
9. [D] D's pipelines is similar to 5) (with ServerSyncConnection) and 8)
10. [A] A's pipeline is similar to 5) and 8)
A common user confusion is that their dataset parallelism is limited by the number of files. Add a warning if the available parallelism is much less than the specified parallelism, and tell the user to repartition() in that case.
Continuation of #22449
Fix pip activation so something like this will not crash
```
ray.init(runtime_env={"pip": ["toolz", "requests"]})
```
Also enable test that hit this code path.
Various improvements to Ray Train fault tolerance.
Add more log statements for better debugging of Ray Train failure handling.
Fixes [Bug] [Train] Cannot reproduce fault-tolerance, script hangs upon any node shutdown #22349.
Simplifies fault tolerance by removing backend specific handle_failure. If any workers have failed, all workers will be restarted and training will continue from the last checkpoint.
Also adds a test for fault tolerance with an actual torch example. When testing locally, the test hangs before the fix, but passes after.
Running Datasets shuffle with 1TB data and 2k partitions sometimes times out due to a failed object fetch. This happens because the object directory notifies the PullManager that the object is already on the local node, even though it isn't. This seems to be a bug in the object directory.
To work around this on the PullManager side, this PR filters out the current node from the list of locations provided by the object directory. @jjyao confirmed that this fixes the issue for Datasets shuffle.
The current behavior of workflow's `.options()` is to **completely rewrite all the options** rather than **update options**, this is less intuitive and inconsistent with the behavior of `.options()` in remote functions.
For example:
```
# Remote Function
@ray.remote(num_cpus=2, max_retries=2)
f.options(num_cpus=1)
```
`options()` here **updated** num_cpus while **the rest options are untouched**, i.e. max_retires is still 2. This is the expected behavior and more intuitive.
```
# Workflow Step
@workflow.step(num_cpus=2, max_retries=2)
f.options(num_cpus=1)
```
`options()` here **completely drop all existing options** and only set num_cpus, i.e. previous value of max_retires (2) is dropped and reverted to default (3). This will also drop other fields like `name` and `metadata` if name and metadata are given in the decorator but not in the options().
This is the first PR to refactor scheduler data structures (See #22850).
Major changes:
- Hid the implementation details in the `ResourceRequest` and `TaskResourceInstnaces` classes, which expose public methods such as algebra operators and comparison operators.
- Hid the differences between "predefined" and "custom" resources inside these 2 classes. Call sites can simply use the resource ID to access the resource, no matter it is predefined or custom.
- The predefined_resources vector now always has the full length. So no more "resize"s are needed.
- Removed the `ResourceCapacity` class. Now "total" and "available" resources are stored in separate fields in "NodeResources".
- Moved helper functions for FixedPoint vectors from "cluster_resource_data.h" to "fixed_point.h"
- "ResourceID" now has static methods to get the resource ids of predefined resources, e.g. "ResourceID::CPU()".
- Encapsulated unit-instance resource logic to "ResourceID"
Other planned changes that are not included in this PR:
- Rename ResourceRequest to ResourceSet, and move it to its own file.
- Remove the predefined vectors and always use maps.
Co-authored-by: Chong-Li <lc300133@antgroup.com>