Dask default's to a disk-based shuffle even thought we're using a distributed scheduler, which appears to be resulting in dropped data since the filesystem isn't shared across nodes. Dask Distributed manually sets the shuffle algorithm in the global config to the task-based shuffle, which the Dask-on-Ray scheduler should probably do as well.
This PR adds a Dask config helper, `enable_dask_on_ray`, that sets Dask-on-Ray as the default scheduler along with changing the default shuffle to a task-based shuffle. The shuffle method can still be overridden by the user by manually specifying `df.set_index(shuffle="disk")`.
This change adds support for parsing `--address` as bootstrap address, and treating `--port` as GCS port, when using GCS for bootstrapping.
Not launching Redis in GCS bootstrapping mode, and using GCS to fetch initial cluster information, will be implemented in a subsequent change.
Also made some cleanups.
* updating azure autoscaler versions and backwards compatibility, and moving to azure-identity based authentication
* adding azure sdk rqmts for tests
* updating azure test requirements and adding wrapper function for azure sdk function resolution
* adding docstring to get_azure_sdk_function
Co-authored-by: Scott Graham <scgraham@microsoft.com>
Currently, the logic of uri reference in raylet is:
- For job level, add uri reference when job started and remove uri reference when job finished.
- For actor level, add and remove uri reference for detached actor only.
In this PR, the logic is optimized to:
- For job level, check if runtime env should be installed eagerly first. If true, add or remove uri reference.
- For actor level
* First, add uri reference for starting worker process to avoid that runtime env is gcd before worker registered.
* Second, add uri reference for echo worker thread of worker process. We will remove reference when worker disconnected.
- Besides, we move the instance of `RuntimeEnvManager` from `node_manager` to `worker_pool`.
- Enable the test `test_actor_level_gc` and add some tests in python and worker pool test.
GcsClient accepts only redis before. To make it work without redis, we need to be able to pass gcs address to gcs client as well.
In this PR, we add GCS related into into GcsClientOptions so that we can connect to the gcs directly with gcs address.
This PR is part of GCS bootstrap. In the following PR, we'll add functionality to set the correct GcsClientOptions based on flags.
The current resource reporting is run in OSS. Revert the change. For example it reported
InitialConfigResources: {node:172.31.45.118: 1.000000}, {object_store_memory: 468605759.960938 GiB},
For 10GB memory object_store.
This PR implements gRPC timeout for various blocking RPCs.
Previously, the timeout with promise didn't work properly because the client didn't cancel the timed out RPCs. This PR will properly implement RPC timeout.
This PR supports;
- Blocking RPCs for core APIs, creating / getting / removing actor + pg.
- Internal KV ops
The global state accessor also has the infinite blocking calls which we need to fix. But fixing them requires a huge refactoring, so this will be done in a separate PR.
Same for the placement group calls (they will be done in a separate PR)
Also, this means we can have scenario such as the client receives the DEADLINE EXCEEDED error, but the handler is invoked. Right now, this is not handled correctly in Ray. We should start thinking about how to handle these scenarios better.
This adds memory monitoring to scalability envelope tests so that we can compare the peak memory usage for both nonHA & HA.
NOTE: the current way of adding memory monitor is not great, and we should implement fixture to support this better, but that's not in progress yet.
Current logs API simply returns a str to unblock development and integration. We should add proper log streaming for better UX and external job manager integration.
Co-authored-by: Sven Mika <sven@anyscale.io>
Co-authored-by: sven1977 <svenmika1977@gmail.com>
Co-authored-by: Ed Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
Co-authored-by: Simon Mo <simon.mo@hey.com>
Co-authored-by: Avnish Narayan <38871737+avnishn@users.noreply.github.com>
Co-authored-by: Jiao Dong <jiaodong@anyscale.com>
Uses a direct `pip install` instead of creating a conda env to make pip installs incremental to the cluster environment.
Separates the handling of `pip` and `conda` dependencies.
The new `pip` approach still works if only the base Ray is installed on the cluster and the user specifies libraries like "ray[serve]" in the `pip` field. The mechanism is as follows:
- We don't actually want to reinstall ray via pip, since this could lead to version mismatch issues. Instead, we want to use the Ray that's already installed in the cluster.
- So if "ray" was included by the user in the pip list, remove it
- If a library "ray[serve]" or "ray[tune, rllib]" was included in the pip list, remove it and replace it by its dependencies (e.g. "uvicorn", "requests", ..)
Co-authored-by: architkulkarni <arkulkar@gmail.com>
Co-authored-by: architkulkarni <architkulkarni@users.noreply.github.com>
The `results` with lesser tasks contains un-stable wait times, so increased the number of tasks in a hope for less noisy wait times. Minor in changes in assert comparisons have also been made for lesser error.
Before this PR, GcsActorManager::CreateActor() would replace actor's namespace by
actors's owner job's namespace, even if actor is created by user with a user specified
namespace. But in named_actors_, actor is set to use user specified namespace by
GcsActorManager::RegisterActor before CreateActor() is called, So that
GcsActorManager::DestroyActor failed to find actor from named_actors_ by owner job's
namespace to remove, hence reuse actor name in same namespace failed for same name actor
not removed by GcsActorManager::DestroyActor in named_actors_.
issue #20611