In the implementation of `GcsResourceManager::UpdateResourceCapacity`, 'cluster_scheduling_resources_' is modified, but this method is only used in c++ unit test, it is easy to cause confuse when reading the code. Since this method can be completely replaced by `GcsResourceManager::OnNodeAdd`, just remove it.
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
* fix null pointer crash when GcsResourceManager::SetAvailableResources
* add warning log when node does not exist
* add unit test
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
This is a working in progress PR that splits cluster_task_manager into local and distributed parts.
For the distributed scheduler (cluster_task_manager_:
/// Schedules a task onto one node of the cluster. The logic is as follows:
/// 1. Queue tasks for scheduling.
/// 2. Pick a node on the cluster which has the available resources to run a
/// task.
/// * Step 2 should occur any time the state of the cluster is
/// changed, or a new task is queued.
/// 3. For tasks that's infeasible, put them into infeasible queue and reports
/// it to gcs, where the auto scaler will be notified and start new node
/// to accommodate the requirement.
For the local task manager:
/// It Manages the lifetime of a task on the local node. It receives request from
/// cluster_task_manager (the distributed scheduler) and does the following
/// steps:
/// 1. Pulling task dependencies, add the task into to_dispatch queue.
/// 2. Once task's dependencies are all pulled locally, the task becomes ready
/// to dispatch.
/// 3. For all tasks that are dispatch-ready, we schedule them by acquiring
/// local resources (including pinning the objects in memory and deduct
/// cpu/gpu and other resources from local resource manager), and start
/// a worker.
/// 4. If task failed to acquire resources in step 3, we will try to
/// spill it to a different remote node.
/// 5. When a worker finishes executing its task(s), the requester will return
/// it and we should release the resources in our view of the node's state.
/// 6. If a task has been waiting for arguments for too long, it will also be
/// spilled back to a different node.
///
This PR adds the following stage fusion optimizations (off by default). In a later PR, I plan to enable this by default for DatasetPipelines.
- Stage fusion: Whether to fuse compatible OneToOne stages.
- Read stage fusion: Whether to fuse read stages into downstream OneToOne stages. This is accomplished by rewriting the read stage (LazyBlockList) into a transformation over a collection of read tasks (BlockList -> MapBatches(do_read)).
- Shuffle stage fusion: Whether to fuse compatible OneToOne stages into shuffle stages that support specifying a map-side block UDF.
Stages are considered compatible if their compute strategy is the same ("tasks" vs "actors"), and they have the same Ray remote args. Currently, the PR is ignoring the remote args of read tasks, but this will be fixed as a followup (I didn't want to change the read tasks default here).
Adds a unit-tested and restructured ray_release package for running release tests.
Relevant changes in behavior:
Per default, Buildkite will wait for the wheels of the current commit to be available. Alternatively, users can a) specify a different commit hash, b) a wheels URL (which we will also wait for to be available) or c) specify a branch (or user/branch combination), in which case the latest available wheels will be used (e.g. if master is passed, behavior matches old default behavior).
The main subpackages are:
Cluster manager: Creates cluster envs/computes, starts cluster, terminates cluster
Command runner: Runs commands, e.g. as client command or sdk command
File manager: Uploads/downloads files to/from session
Reporter: Reports results (e.g. to database)
Much of the code base is unit tested, but there are probably some pieces missing.
Example build (waited for wheels to be built): https://buildkite.com/ray-project/kf-dev/builds/51#_
Wheel build: https://buildkite.com/ray-project/ray-builders-branch/builds/6023
When we vendor third-party code, we should update LICENSE file. Previously we vendored two pieces of code:
- conda utilities from MLflow
- virtualenv-clone
But we only included the attribution in the relevant source files, not in our LICENSE file. This PR adds the necessary info to our LICENSE file.
Why are these changes needed?
Switches GetObject from unary-unary to unary-streaming so that large objects can be streamed across multiple messages (currently hardcoded to 64MiB chunks). This will allow users to retrieve objects larger than 2GiB from a remote cluster. If the transfer is interrupted by a recoverable gRPC error (i.e. temporary disconnect), then the request will be retried starting from the first chunk that hasn't been received yet.
Proto changes
GetRequest's now have the field start_chunk_id, to indicate which chunk to start from (useful if the we have to retry a request after already receiving some chunks). GetResponses now have a chunk_id (0 indexed chunk of the serialized object), total_chunks (total number of chunks, used in async transfers to determine when all chunks have been received), and total_size (the total size of the object in bytes, used to raise user warnings if the object being retrieved is very large).
Server changes
Mainly just updating GetObject logic to yield chunks instead of returning
Client changes
At the moment, objects can be retrieved directly from the raylet servicer (ray.get) or asynchronously over the datapath (await some_remote_func.remote()). In both cases, the request will error if the chunk isn't valid (server side error) or if a chunk is received out of order (shouldn't happen in practice, since gRPC guarantees that messages in a stream either arrive in order or not at all).
ray.get is fairly straightforward, and changes are mainly to accommodate yielding from the stub instead of taking the value directly.
await some_remote_func.remote() is similar, but to keep things consistent with other async handling collecting the chunks is handled by a ChunkCollector, which wraps around the original callback.
These tests are passing without issues on my Windows machine, so unskipping them to check on CI.
I will push the linting changes separately to execute the test suite twice for confirming that flakyness is removed.
Co-authored-by: Philipp Moritz <pcmoritz@gmail.com>
Reason for not using `queue.Queue` for multiprocessing purposes on Windows is at https://stackoverflow.com/a/37244276 and in the second reply to https://stackoverflow.com/a/37245300
And reason for using `multiprocessing.JoinableQueue` over `multiprocessing.Queue` is https://stackoverflow.com/a/30725121
AFAIK, this is because in Windows each process gets it own `Queue` and hence nothing is shared among those processes. When `multiprocessing.Queue` is used, changes in it are shared via pipes internally along with proper locks.
1. In scheduling optimization, we should encapsulate `SchedulingResources`, `GcsNodeInfo` and other node related information into a `NodeContext` for use, which requires that `SchedulingResources` is shareable. This PR does not involve the transformation logic of `NodeContext`, but only transforms `SchedulingResources` into shareable.
2. `cluster_scheduling_resources_` holds raw object of `SchedulingResources`, which will bring some overhead when rehash (even though the std::move used when rehash).
Co-authored-by: 黑驰 <senlin.zsl@antgroup.com>
add patch for newer setuptools, can be removed once grpc 1.44 is release
Why are these changes needed?
With grpc updated to 1.43, one of the patches is not needed.
Patch needed when building locally for newer setuptools version. See grpc/grpc#28392 for more details.
Also needed as a prereq to #21221
This reverts commit 6235b6d7e9.
Looks like windows://python/ray/tests:test_dataclient_disconnect has similar level of flakiness as before the revert. This seems unrelated and the test needs to be fixed in another way.