mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
Remove dead tests related to the old scheduler (#21465)
This commit is contained in:
parent
123aa7cd2b
commit
501b78feaa
4 changed files with 5 additions and 146 deletions
|
@ -680,10 +680,6 @@ def format_web_url(url):
|
|||
return url
|
||||
|
||||
|
||||
def new_scheduler_enabled():
|
||||
return os.environ.get("RAY_ENABLE_NEW_SCHEDULER", "1") == "1"
|
||||
|
||||
|
||||
def client_test_enabled() -> bool:
|
||||
return ray._private.client_mode_hook.is_client_mode_enabled
|
||||
|
||||
|
|
|
@ -13,8 +13,6 @@ from ray._private.test_utils import (
|
|||
wait_for_condition,
|
||||
wait_for_pid_to_exit,
|
||||
generate_system_config_map,
|
||||
get_other_nodes,
|
||||
new_scheduler_enabled,
|
||||
SignalActor,
|
||||
)
|
||||
|
||||
|
@ -323,40 +321,6 @@ def test_actor_restart_on_node_failure(ray_start_cluster):
|
|||
assert result == 1 or result == results[-1] + 1
|
||||
|
||||
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo")
|
||||
def test_actor_restart_without_task(ray_start_regular):
|
||||
"""Test a dead actor can be restarted without sending task to it."""
|
||||
|
||||
@ray.remote(max_restarts=1, resources={"actor": 1})
|
||||
class RestartableActor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def get_pid(self):
|
||||
return os.getpid()
|
||||
|
||||
@ray.remote(resources={"actor": 1})
|
||||
def probe():
|
||||
return
|
||||
|
||||
# Returns whether the "actor" resource is available.
|
||||
def actor_resource_available():
|
||||
p = probe.remote()
|
||||
ready, _ = ray.wait([p], timeout=1)
|
||||
return len(ready) > 0
|
||||
|
||||
ray.experimental.set_resource("actor", 1)
|
||||
actor = RestartableActor.remote()
|
||||
wait_for_condition(lambda: not actor_resource_available())
|
||||
# Kill the actor.
|
||||
pid = ray.get(actor.get_pid.remote())
|
||||
|
||||
p = probe.remote()
|
||||
os.kill(pid, SIGKILL)
|
||||
ray.get(p)
|
||||
wait_for_condition(lambda: not actor_resource_available())
|
||||
|
||||
|
||||
def test_caller_actor_restart(ray_start_regular):
|
||||
"""Test tasks from a restarted actor can be correctly processed
|
||||
by the receiving actor."""
|
||||
|
@ -538,73 +502,6 @@ def test_decorated_method(ray_start_regular):
|
|||
assert ray.get(object_ref) == 7 # 2 * 3 + 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_cpus": 1,
|
||||
"num_nodes": 3,
|
||||
}], indirect=True)
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic resources todo")
|
||||
def test_ray_wait_dead_actor(ray_start_cluster):
|
||||
"""Tests that methods completed by dead actors are returned as ready"""
|
||||
cluster = ray_start_cluster
|
||||
|
||||
@ray.remote(num_cpus=1)
|
||||
class Actor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def node_id(self):
|
||||
return ray.worker.global_worker.node.unique_id
|
||||
|
||||
def ping(self):
|
||||
time.sleep(1)
|
||||
|
||||
# Create some actors and wait for them to initialize.
|
||||
num_nodes = len(cluster.list_all_nodes())
|
||||
actors = [Actor.remote() for _ in range(num_nodes)]
|
||||
ray.get([actor.ping.remote() for actor in actors])
|
||||
|
||||
def actor_dead():
|
||||
# Ping the actors and make sure the tasks complete.
|
||||
ping_ids = [actor.ping.remote() for actor in actors]
|
||||
unready = ping_ids[:]
|
||||
while unready:
|
||||
_, unready = ray.wait(unready, timeout=0)
|
||||
time.sleep(1)
|
||||
|
||||
try:
|
||||
ray.get(ping_ids)
|
||||
return False
|
||||
except ray.exceptions.RayActorError:
|
||||
return True
|
||||
|
||||
# Kill a node that must not be driver node or head node.
|
||||
cluster.remove_node(get_other_nodes(cluster, exclude_head=True)[-1])
|
||||
# Repeatedly submit tasks and call ray.wait until the exception for the
|
||||
# dead actor is received.
|
||||
wait_for_condition(actor_dead)
|
||||
|
||||
# Create an actor on the local node that will call ray.wait in a loop.
|
||||
head_node_resource = "HEAD_NODE"
|
||||
ray.experimental.set_resource(head_node_resource, 1)
|
||||
|
||||
@ray.remote(num_cpus=0, resources={head_node_resource: 1})
|
||||
class ParentActor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def wait(self):
|
||||
return actor_dead()
|
||||
|
||||
def ping(self):
|
||||
return
|
||||
|
||||
# Repeatedly call ray.wait through the local actor until the exception for
|
||||
# the dead actor is received.
|
||||
parent_actor = ParentActor.remote()
|
||||
wait_for_condition(lambda: ray.get(parent_actor.wait.remote()))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_cpus": 1,
|
||||
|
|
|
@ -14,7 +14,7 @@ import ray.cluster_utils
|
|||
import ray._private.gcs_utils as gcs_utils
|
||||
from ray._private.test_utils import (
|
||||
SignalActor, kill_actor_and_wait_for_failure, put_object,
|
||||
wait_for_condition, new_scheduler_enabled, convert_actor_state)
|
||||
wait_for_condition, convert_actor_state)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -171,30 +171,6 @@ def test_dependency_refcounts(ray_start_regular):
|
|||
check_refcounts({})
|
||||
|
||||
|
||||
@pytest.mark.skipif(new_scheduler_enabled(), reason="dynamic res todo")
|
||||
def test_actor_creation_task(ray_start_regular):
|
||||
@ray.remote
|
||||
def large_object():
|
||||
# This will be spilled to plasma.
|
||||
return np.zeros(10 * 1024 * 1024, dtype=np.uint8)
|
||||
|
||||
@ray.remote(resources={"init": 1})
|
||||
class Actor:
|
||||
def __init__(self, dependency):
|
||||
return
|
||||
|
||||
def ping(self):
|
||||
return
|
||||
|
||||
a = Actor.remote(large_object.remote())
|
||||
ping = a.ping.remote()
|
||||
ready, unready = ray.wait([ping], timeout=1)
|
||||
assert not ready
|
||||
|
||||
ray.experimental.set_resource("init", 1)
|
||||
ray.get(ping)
|
||||
|
||||
|
||||
def test_basic_pinning(one_worker_100MiB):
|
||||
@ray.remote
|
||||
def f(array):
|
||||
|
|
|
@ -16,9 +16,8 @@ import ray.util.accelerators
|
|||
import ray.cluster_utils
|
||||
from ray._private.test_utils import fetch_prometheus
|
||||
|
||||
from ray._private.test_utils import (wait_for_condition, new_scheduler_enabled,
|
||||
Semaphore, object_memory_usage,
|
||||
SignalActor)
|
||||
from ray._private.test_utils import (wait_for_condition, Semaphore,
|
||||
object_memory_usage, SignalActor)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
@ -193,13 +192,7 @@ def test_local_scheduling_first(ray_start_cluster):
|
|||
assert local()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("fast", [True, False])
|
||||
def test_load_balancing_with_dependencies(ray_start_cluster, fast):
|
||||
if fast and new_scheduler_enabled:
|
||||
# Load-balancing on new scheduler can be inefficient if (task
|
||||
# duration:heartbeat interval) is small enough.
|
||||
pytest.skip()
|
||||
|
||||
def test_load_balancing_with_dependencies(ray_start_cluster):
|
||||
# This test ensures that tasks are being assigned to all raylets in a
|
||||
# roughly equal manner even when the tasks have dependencies.
|
||||
cluster = ray_start_cluster
|
||||
|
@ -210,10 +203,7 @@ def test_load_balancing_with_dependencies(ray_start_cluster, fast):
|
|||
|
||||
@ray.remote
|
||||
def f(x):
|
||||
if fast:
|
||||
time.sleep(0.010)
|
||||
else:
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.1)
|
||||
return ray.worker.global_worker.node.unique_id
|
||||
|
||||
# This object will be local to one of the raylets. Make sure
|
||||
|
|
Loading…
Add table
Reference in a new issue