Mostly cluster tests are enabled in this PR in the above mentioned files. Some non-cluster tests are also enabled. All of these pass on my machine without issues.
Runtime Environments is already GA in Ray 1.6.0. The latest doc is [here](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#runtime-environments). And now, we already supported a [inheritance](https://docs.ray.io/en/master/ray-core/handling-dependencies.html#inheritance) behavior as follows (copied from the doc):
- The runtime_env["env_vars"] field will be merged with the runtime_env["env_vars"] field of the parent. This allows for environment variables set in the parent’s runtime environment to be automatically propagated to the child, even if new environment variables are set in the child’s runtime environment.
- Every other field in the runtime_env will be overridden by the child, not merged. For example, if runtime_env["py_modules"] is specified, it will replace the runtime_env["py_modules"] field of the parent.
We think this runtime env merging logic is so complex and confusing to users because users can't know the final runtime env before the jobs are run.
Current PR tries to do a refactor and change the behavior of Runtime Environments inheritance. Here is the new behavior:
- **If there is no runtime env option when we create actor, inherit the parent runtime env.**
- **Otherwise, use the optional runtime env directly and don't do the merging.**
Add a new API named `ray.runtime_env.get_current_runtime_env()` to get the parent runtime env and modify this dict by yourself. Like:
```Actor.options(runtime_env=ray.runtime_env.get_current_runtime_env().update({"X": "Y"}))```
This new API also can be used in ray client.
- Separate spread scheduling and default hydra scheduling (i.e. SpreadScheduling != HybridScheduling(threshold=0)): they are already separated in the API layer and they have the different end goals so it makes sense to separate their implementations and evolve them independently.
- Simple round robin for spread scheduling: this is just a starting implementation, can be optimized later.
- Prefer not to spill back tasks that are waiting for args since the pull is already in progress.
While investigating #22161, it is observed GIL is held for an extended amount of time (up to 1000s) with stack trace [1]. It is possible either there are many iterations within `Pickle5Writer.write_to()` calling `ray::parallel_memcopy()`, or a few `ray::parallel_memcopy()` taking a long time (less likely). Either way, `ray::parallel_memcopy()` or `std::memcpy()` should not hold GIL.
This flag has been turned on by default for almost 4 months. Delete the old code so that when refactoring, we don't need to take care of the legacy code path.
The existing Job info in the cluster snapshot uses the old definition of Job, which is a single Ray driver (a single `ray.init()` connection).
In the new Job Submission protocol, a Job just specifies an entrypoint which can be any shell command. As such a Job can have zero or multiple Ray drivers. This means we should add a new snapshot entry corresponding to new jobs. We'll leave the old snapshot in place for legacy jobs.
- Also fixes `get_all_jobs` by using the appropriate KV namespace, and stripping the job key KV prefix from the job ID. It wasn't working before.
- This PR also unifies the datatype used by the GET jobs/ endpoint to be the same as the one used by the new jobs cluster snapshot. For backwards compatibility, the `status` and `message` fields are preserved.
In the implementation of `GcsResourceManager::UpdateResourceCapacity`, 'cluster_scheduling_resources_' is modified, but this method is only used in c++ unit test, it is easy to cause confuse when reading the code. Since this method can be completely replaced by `GcsResourceManager::OnNodeAdd`, just remove it.
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
* fix null pointer crash when GcsResourceManager::SetAvailableResources
* add warning log when node does not exist
* add unit test
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
This is a working in progress PR that splits cluster_task_manager into local and distributed parts.
For the distributed scheduler (cluster_task_manager_:
/// Schedules a task onto one node of the cluster. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a
/// task.
/// * Step 2 should occur any time the state of the cluster is
/// changed, or a new task is queued.
/// 3. For tasks that's infeasible, put them into infeasible queue and reports
/// it to gcs, where the auto scaler will be notified and start new node
/// to accommodate the requirement.
For the local task manager:
/// It Manages the lifetime of a task on the local node. It receives request from
/// cluster_task_manager (the distributed scheduler) and does the following
/// steps:
/// 1. Pulling task dependencies, add the task into to_dispatch queue.
/// 2. Once task's dependencies are all pulled locally, the task becomes ready
/// to dispatch.
/// 3. For all tasks that are dispatch-ready, we schedule them by acquiring
/// local resources (including pinning the objects in memory and deduct
/// cpu/gpu and other resources from local resource manager), and start
/// a worker.
/// 4. If task failed to acquire resources in step 3, we will try to
/// spill it to a different remote node.
/// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state.
/// 6. If a task has been waiting for arguments for too long, it will also be
/// spilled back to a different node.
///
This PR adds the following stage fusion optimizations (off by default). In a later PR, I plan to enable this by default for DatasetPipelines.
- Stage fusion: Whether to fuse compatible OneToOne stages.
- Read stage fusion: Whether to fuse read stages into downstream OneToOne stages. This is accomplished by rewriting the read stage (LazyBlockList) into a transformation over a collection of read tasks (BlockList -> MapBatches(do_read)).
- Shuffle stage fusion: Whether to fuse compatible OneToOne stages into shuffle stages that support specifying a map-side block UDF.
Stages are considered compatible if their compute strategy is the same ("tasks" vs "actors"), and they have the same Ray remote args. Currently, the PR is ignoring the remote args of read tasks, but this will be fixed as a followup (I didn't want to change the read tasks default here).
Adds a unit-tested and restructured ray_release package for running release tests.
Relevant changes in behavior:
Per default, Buildkite will wait for the wheels of the current commit to be available. Alternatively, users can a) specify a different commit hash, b) a wheels URL (which we will also wait for to be available) or c) specify a branch (or user/branch combination), in which case the latest available wheels will be used (e.g. if master is passed, behavior matches old default behavior).
The main subpackages are:
Cluster manager: Creates cluster envs/computes, starts cluster, terminates cluster
Command runner: Runs commands, e.g. as client command or sdk command
File manager: Uploads/downloads files to/from session
Reporter: Reports results (e.g. to database)
Much of the code base is unit tested, but there are probably some pieces missing.
Example build (waited for wheels to be built): https://buildkite.com/ray-project/kf-dev/builds/51#_
Wheel build: https://buildkite.com/ray-project/ray-builders-branch/builds/6023
When we vendor third-party code, we should update LICENSE file. Previously we vendored two pieces of code:
- conda utilities from MLflow
- virtualenv-clone
But we only included the attribution in the relevant source files, not in our LICENSE file. This PR adds the necessary info to our LICENSE file.