Remove DEFAULT_SCHEDULING_STRATEGY and SPREAD_SCHEDULING_STRATEGY (#22558)

This commit is contained in:
Jiajun Yao 2022-02-22 21:34:21 -08:00 committed by GitHub
parent abf2a70a29
commit 82443aec63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 12 additions and 28 deletions

View file

@ -125,8 +125,6 @@ from ray.exceptions import (
) )
from ray import external_storage from ray import external_storage
from ray.util.scheduling_strategies import ( from ray.util.scheduling_strategies import (
DEFAULT_SCHEDULING_STRATEGY,
SPREAD_SCHEDULING_STRATEGY,
PlacementGroupSchedulingStrategy, PlacementGroupSchedulingStrategy,
) )
import ray.ray_constants as ray_constants import ray.ray_constants as ray_constants
@ -1459,9 +1457,9 @@ cdef class CoreWorker:
CPlacementGroupSchedulingStrategy \ CPlacementGroupSchedulingStrategy \
*c_placement_group_scheduling_strategy *c_placement_group_scheduling_strategy
assert python_scheduling_strategy is not None assert python_scheduling_strategy is not None
if python_scheduling_strategy == DEFAULT_SCHEDULING_STRATEGY: if python_scheduling_strategy == "DEFAULT":
c_scheduling_strategy[0].mutable_default_scheduling_strategy() c_scheduling_strategy[0].mutable_default_scheduling_strategy()
elif python_scheduling_strategy == SPREAD_SCHEDULING_STRATEGY: elif python_scheduling_strategy == "SPREAD":
c_scheduling_strategy[0].mutable_spread_scheduling_strategy() c_scheduling_strategy[0].mutable_spread_scheduling_strategy()
elif isinstance(python_scheduling_strategy, elif isinstance(python_scheduling_strategy,
PlacementGroupSchedulingStrategy): PlacementGroupSchedulingStrategy):

View file

@ -10,7 +10,6 @@ import ray.worker
from ray.util.annotations import PublicAPI from ray.util.annotations import PublicAPI
from ray.util.placement_group import configure_placement_group_based_on_context from ray.util.placement_group import configure_placement_group_based_on_context
from ray.util.scheduling_strategies import ( from ray.util.scheduling_strategies import (
DEFAULT_SCHEDULING_STRATEGY,
PlacementGroupSchedulingStrategy, PlacementGroupSchedulingStrategy,
SchedulingStrategyT, SchedulingStrategyT,
) )
@ -943,7 +942,7 @@ class ActorClass:
placement_group_capture_child_tasks, placement_group_capture_child_tasks,
) )
else: else:
scheduling_strategy = DEFAULT_SCHEDULING_STRATEGY scheduling_strategy = "DEFAULT"
if runtime_env: if runtime_env:
if isinstance(runtime_env, str): if isinstance(runtime_env, str):

View file

@ -5,7 +5,6 @@ import uuid
from ray import cloudpickle as pickle from ray import cloudpickle as pickle
from ray.util.scheduling_strategies import ( from ray.util.scheduling_strategies import (
DEFAULT_SCHEDULING_STRATEGY,
PlacementGroupSchedulingStrategy, PlacementGroupSchedulingStrategy,
SchedulingStrategyT, SchedulingStrategyT,
) )
@ -416,7 +415,7 @@ class RemoteFunction:
placement_group_capture_child_tasks, placement_group_capture_child_tasks,
) )
else: else:
scheduling_strategy = DEFAULT_SCHEDULING_STRATEGY scheduling_strategy = "DEFAULT"
if not runtime_env or runtime_env == "{}": if not runtime_env or runtime_env == "{}":
runtime_env = self._runtime_env runtime_env = self._runtime_env

View file

@ -8,8 +8,6 @@ import ray
from ray.util.client.ray_client_helpers import connect_to_client_or_not from ray.util.client.ray_client_helpers import connect_to_client_or_not
import ray.experimental.internal_kv as internal_kv import ray.experimental.internal_kv as internal_kv
from ray.util.scheduling_strategies import ( from ray.util.scheduling_strategies import (
DEFAULT_SCHEDULING_STRATEGY,
SPREAD_SCHEDULING_STRATEGY,
PlacementGroupSchedulingStrategy, PlacementGroupSchedulingStrategy,
) )
@ -108,7 +106,7 @@ def test_default_scheduling_strategy(ray_start_cluster, connect_to_client):
with connect_to_client_or_not(connect_to_client): with connect_to_client_or_not(connect_to_client):
@ray.remote(scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY) @ray.remote(scheduling_strategy="DEFAULT")
def get_node_id_1(): def get_node_id_1():
return ray.worker.global_worker.current_node_id return ray.worker.global_worker.current_node_id
@ -127,11 +125,7 @@ def test_default_scheduling_strategy(ray_start_cluster, connect_to_client):
return ray.worker.global_worker.current_node_id return ray.worker.global_worker.current_node_id
assert ( assert (
ray.get( ray.get(get_node_id_2.options(scheduling_strategy="DEFAULT").remote())
get_node_id_2.options(
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY
).remote()
)
== head_node_id == head_node_id
) )
@ -152,9 +146,7 @@ def test_default_scheduling_strategy(ray_start_cluster, connect_to_client):
# Use parent's placement group # Use parent's placement group
ray.get(get_node_id_3.remote()), ray.get(get_node_id_3.remote()),
ray.get( ray.get(
get_node_id_3.options( get_node_id_3.options(scheduling_strategy="DEFAULT").remote()
scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY
).remote()
), ),
] ]
@ -179,7 +171,7 @@ def test_placement_group_scheduling_strategy(ray_start_cluster, connect_to_clien
with connect_to_client_or_not(connect_to_client): with connect_to_client_or_not(connect_to_client):
@ray.remote(scheduling_strategy=DEFAULT_SCHEDULING_STRATEGY) @ray.remote(scheduling_strategy="DEFAULT")
def get_node_id_1(): def get_node_id_1():
return ray.worker.global_worker.current_node_id return ray.worker.global_worker.current_node_id
@ -286,7 +278,7 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
# Wait for updating driver raylet's resource view. # Wait for updating driver raylet's resource view.
time.sleep(5) time.sleep(5)
@ray.remote(scheduling_strategy=SPREAD_SCHEDULING_STRATEGY) @ray.remote(scheduling_strategy="SPREAD")
def task1(): def task1():
internal_kv._internal_kv_put("test_task1", "task1") internal_kv._internal_kv_put("test_task1", "task1")
while internal_kv._internal_kv_exists("test_task1"): while internal_kv._internal_kv_exists("test_task1"):
@ -304,9 +296,7 @@ def test_spread_scheduling_strategy(ray_start_cluster, connect_to_client):
time.sleep(0.1) time.sleep(0.1)
# Wait for updating driver raylet's resource view. # Wait for updating driver raylet's resource view.
time.sleep(5) time.sleep(5)
locations.append( locations.append(task2.options(scheduling_strategy="SPREAD").remote())
task2.options(scheduling_strategy=SPREAD_SCHEDULING_STRATEGY).remote()
)
while not internal_kv._internal_kv_exists("test_task2"): while not internal_kv._internal_kv_exists("test_task2"):
time.sleep(0.1) time.sleep(0.1)
internal_kv._internal_kv_del("test_task1") internal_kv._internal_kv_del("test_task1")

View file

@ -2,13 +2,11 @@ from typing import Union, Optional
from ray.util.annotations import PublicAPI from ray.util.annotations import PublicAPI
from ray.util.placement_group import PlacementGroup from ray.util.placement_group import PlacementGroup
# The default hybrid scheduling strategy # "DEFAULT": The default hybrid scheduling strategy
# based on config scheduler_spread_threshold. # based on config scheduler_spread_threshold.
# This disables any potential placement group capture. # This disables any potential placement group capture.
DEFAULT_SCHEDULING_STRATEGY = "DEFAULT"
# Spread scheduling on a best effort basis. # "SPREAD": Spread scheduling on a best effort basis.
SPREAD_SCHEDULING_STRATEGY = "SPREAD"
@PublicAPI(stability="beta") @PublicAPI(stability="beta")