diff --git a/benchmarks/distributed/test_many_actors.py b/benchmarks/distributed/test_many_actors.py index 94c3ee21f..4c2109cf9 100644 --- a/benchmarks/distributed/test_many_actors.py +++ b/benchmarks/distributed/test_many_actors.py @@ -33,7 +33,7 @@ def test_max_actors(): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() ray.init(address="auto") diff --git a/benchmarks/distributed/test_many_pgs.py b/benchmarks/distributed/test_many_pgs.py index 268607968..5595eda1d 100644 --- a/benchmarks/distributed/test_many_pgs.py +++ b/benchmarks/distributed/test_many_pgs.py @@ -59,7 +59,7 @@ def test_many_placement_groups(): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() ray.init(address="auto") diff --git a/benchmarks/distributed/test_many_tasks.py b/benchmarks/distributed/test_many_tasks.py index 764981e72..2b1ed2e9c 100644 --- a/benchmarks/distributed/test_many_tasks.py +++ b/benchmarks/distributed/test_many_tasks.py @@ -42,7 +42,7 @@ def test_max_running_tasks(num_tasks): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() @click.command() diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 11f69039f..0fada5d39 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1268,3 +1268,14 @@ def check_spilled_mb(address, spilled=None, restored=None, fallback=None): return True wait_for_condition(ok, timeout=3, retry_interval_ms=1000) + + +def no_resource_leaks_excluding_node_resources(): + cluster_resources = ray.cluster_resources() + available_resources = ray.available_resources() + for r in ray.cluster_resources(): + if "node" in r: + del cluster_resources[r] + del available_resources[r] + + return ray.available_resources() == ray.cluster_resources() diff --git a/release/benchmarks/distributed/test_many_actors.py b/release/benchmarks/distributed/test_many_actors.py index cf6c2fdaa..cbba2ead5 100644 --- a/release/benchmarks/distributed/test_many_actors.py +++ b/release/benchmarks/distributed/test_many_actors.py @@ -35,7 +35,7 @@ def test_max_actors(): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() ray.init(address="auto") diff --git a/release/benchmarks/distributed/test_many_pgs.py b/release/benchmarks/distributed/test_many_pgs.py index e1ae74fc8..876000047 100644 --- a/release/benchmarks/distributed/test_many_pgs.py +++ b/release/benchmarks/distributed/test_many_pgs.py @@ -61,7 +61,7 @@ def test_many_placement_groups(): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() ray.init(address="auto") diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 764981e72..2b1ed2e9c 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -42,7 +42,7 @@ def test_max_running_tasks(num_tasks): def no_resource_leaks(): - return ray.available_resources() == ray.cluster_resources() + return test_utils.no_resource_leaks_excluding_node_resources() @click.command() diff --git a/release/benchmarks/many_nodes.yaml b/release/benchmarks/many_nodes.yaml index 0df91bc7c..9c0dbe57b 100644 --- a/release/benchmarks/many_nodes.yaml +++ b/release/benchmarks/many_nodes.yaml @@ -14,7 +14,7 @@ head_node_type: worker_node_types: - name: small_worker - instance_type: m5.xlarge + instance_type: m5.2xlarge min_workers: 249 max_workers: 249 use_spot: false diff --git a/release/benchmarks/object_store.yaml b/release/benchmarks/object_store.yaml index 2b63e317d..6908c9e9b 100644 --- a/release/benchmarks/object_store.yaml +++ b/release/benchmarks/object_store.yaml @@ -12,7 +12,7 @@ head_node_type: worker_node_types: - name: worker_node - instance_type: m4.xlarge + instance_type: m4.2xlarge min_workers: 49 max_workers: 49 use_spot: false diff --git a/release/ray_release/job_manager.py b/release/ray_release/job_manager.py index 2dc8ffe47..05c71d4d6 100644 --- a/release/ray_release/job_manager.py +++ b/release/ray_release/job_manager.py @@ -10,6 +10,7 @@ from ray_release.logger import logger from ray_release.util import ANYSCALE_HOST from ray_release.cluster_manager.cluster_manager import ClusterManager from ray_release.exception import CommandTimeout +from ray_release.util import exponential_backoff_retry class JobManager: @@ -51,11 +52,18 @@ class JobManager: self.start_time[command_id] = time.time() return command_id + def _get_job_status_with_retry(self, command_id): + job_client = self._get_job_client() + return exponential_backoff_retry( + lambda: job_client.get_job_status(self.job_id_pool[command_id]), + retry_exceptions=Exception, + initial_retry_delay_s=1, + max_retries=3, + ) + def _wait_job(self, command_id: int, timeout: int): from ray.job_submission import JobStatus # noqa: F811 - job_client = self._get_job_client() - start_time = time.monotonic() timeout_at = start_time + timeout next_status = start_time + 30 @@ -73,11 +81,11 @@ class JobManager: f"({int(now - start_time)} seconds) ..." ) next_status += 30 - status = job_client.get_job_status(self.job_id_pool[command_id]) + status = self._get_job_status_with_retry(command_id) if status in {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED}: break time.sleep(1) - status = job_client.get_job_status(self.job_id_pool[command_id]) + status = self._get_job_status_with_retry(command_id) # TODO(sang): Propagate JobInfo.error_type if status == JobStatus.SUCCEEDED: retcode = 0