Render tasks that are not schedulable on the dashboard. (#7034)

This commit is contained in:
SangBin Cho 2020-02-10 14:23:06 -08:00 committed by GitHub
parent 3f99be8dad
commit 1e690673d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 181 additions and 20 deletions

View file

@ -103,7 +103,7 @@ export interface RayletInfoResponse {
actorTitle: string;
averageTaskExecutionSpeed: number;
children: RayletInfoResponse["actors"];
currentTaskFuncDesc: string[];
// currentTaskFuncDesc: string[];
ipAddress: string;
isDirectCall: boolean;
jobId: string;
@ -124,8 +124,10 @@ export interface RayletInfoResponse {
}
| {
actorId: string;
actorTitle: string;
requiredResources: { [key: string]: number };
state: -1;
invalidStateType?: 'infeasibleActor' | 'pendingActor';
};
};
}

View file

@ -13,6 +13,7 @@ import {
} from "../../../api";
import Actors from "./Actors";
import Collapse from "@material-ui/core/Collapse";
import orange from '@material-ui/core/colors/orange';
const styles = (theme: Theme) =>
createStyles({
@ -34,9 +35,12 @@ const styles = (theme: Theme) =>
cursor: "pointer"
}
},
infeasible: {
invalidStateTypeInfeasible: {
color: theme.palette.error.main
},
invalidStateTypePendingActor: {
color: orange[500]
},
information: {
fontSize: "0.875rem"
},
@ -161,11 +165,11 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
{
label: "UsedLocalObjectMemory",
value: actor.usedObjectStoreMemory.toLocaleString()
},
{
label: "Task",
value: actor.currentTaskFuncDesc.join(".")
}
// {
// label: "Task",
// value: actor.currentTaskFuncDesc.join(".")
// }
]
: [
{
@ -285,8 +289,16 @@ class Actor extends React.Component<Props & WithStyles<typeof styles>, State> {
)
)}
</React.Fragment>
) : actor.invalidStateType === 'infeasibleActor' ? (
<span className={classes.invalidStateTypeInfeasible}>
{actor.actorTitle} is infeasible.
(Infeasible actor means an actor cannot be created because
Ray cluster cannot satisfy resources requirement).
</span>
) : (
<span className={classes.infeasible}>Infeasible actor</span>
<span className={classes.invalidStateTypePendingActor}>
{actor.actorTitle} is pending until resources are available.
</span>
)}
</Typography>
<Typography className={classes.information}>

View file

@ -201,8 +201,13 @@ class Dashboard(object):
}
infeasible_tasks = sum(
(data.get("infeasibleTasks", []) for data in D.values()), [])
# ready_tasks are used to render tasks that are not schedulable
# due to resource limitations.
# (e.g., Actor requires 2 GPUs but there is only 1 gpu available).
ready_tasks = sum(
(data.get("readyTasks", []) for data in D.values()), [])
actor_tree = self.node_stats.get_actor_tree(
workers_info_by_node, infeasible_tasks)
workers_info_by_node, infeasible_tasks, ready_tasks)
for address, data in D.items():
# process view data
measures_dicts = {}
@ -418,7 +423,8 @@ class NodeStats(threading.Thread):
"error_counts": self.calculate_error_counts(),
}
def get_actor_tree(self, workers_info_by_node, infeasible_tasks) -> Dict:
def get_actor_tree(self, workers_info_by_node, infeasible_tasks,
ready_tasks) -> Dict:
now = time.time()
# construct flattened actor tree
flattened_tree = {"root": {"children": {}}}
@ -449,17 +455,27 @@ class NodeStats(threading.Thread):
actor_info["nodeId"] = node_id
actor_info["pid"] = worker_info["pid"]
for infeasible_task in infeasible_tasks:
def _update_flatten_tree(task, task_spec_type, invalid_state_type):
actor_id = ray.utils.binary_to_hex(
b64decode(
infeasible_task["actorCreationTaskSpec"]["actorId"]))
caller_addr = (infeasible_task["callerAddress"]["ipAddress"],
str(infeasible_task["callerAddress"]["port"]))
b64decode(task[task_spec_type]["actorId"]))
caller_addr = (task["callerAddress"]["ipAddress"],
str(task["callerAddress"]["port"]))
caller_id = self._addr_to_actor_id.get(caller_addr, "root")
child_to_parent[actor_id] = caller_id
infeasible_task["state"] = -1
format_reply_id(infeasible_tasks)
flattened_tree[actor_id] = infeasible_task
task["state"] = -1
task["invalidStateType"] = invalid_state_type
task["actorTitle"] = task["functionDescriptor"][
"pythonFunctionDescriptor"]["className"]
format_reply_id(task)
flattened_tree[actor_id] = task
for infeasible_task in infeasible_tasks:
_update_flatten_tree(infeasible_task, "actorCreationTaskSpec",
"infeasibleActor")
for ready_task in ready_tasks:
_update_flatten_tree(ready_task, "actorCreationTaskSpec",
"pendingActor")
# construct actor tree
actor_tree = flattened_tree

View file

@ -158,6 +158,40 @@ def wait_for_condition(condition_predictor,
return False
def wait_until_succeeded_without_exception(func,
exceptions,
*args,
timeout_ms=1000,
retry_interval_ms=100):
"""A helper function that waits until a given function
completes without exceptions.
Args:
func: A function to run.
exceptions(tuple): Exceptions that are supposed to occur.
args: arguments to pass for a given func
timeout_ms: Maximum timeout in milliseconds.
retry_interval_ms: Retry interval in milliseconds.
Return:
Whether exception occurs within a timeout.
"""
if type(exceptions) != tuple:
print("exceptions arguments should be given as a tuple")
return False
time_elapsed = 0
start = time.time()
while time_elapsed <= timeout_ms:
try:
func(*args)
return True
except exceptions:
time_elapsed = (time.time() - start) * 1000
time.sleep(retry_interval_ms / 1000.0)
return False
def recursive_fnmatch(dirpath, pattern):
"""Looks at a file directory subtree for a filename pattern.

View file

@ -11,7 +11,8 @@ from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
from ray.core.generated import reporter_pb2
from ray.core.generated import reporter_pb2_grpc
from ray.test_utils import RayTestTimeoutException
from ray.test_utils import (RayTestTimeoutException,
wait_until_succeeded_without_exception)
def test_worker_stats(shutdown_only):
@ -249,6 +250,86 @@ def test_raylet_info_endpoint(shutdown_only):
time.sleep(1)
def test_raylet_infeasible_tasks(shutdown_only):
"""
This test creates an actor that requires 5 GPUs
but a ray cluster only has 3 GPUs. As a result,
the new actor should be an infeasible actor.
"""
addresses = ray.init(num_gpus=3)
@ray.remote(num_gpus=5)
class ActorRequiringGPU:
def __init__(self):
pass
ActorRequiringGPU.remote()
def test_infeasible_actor(ray_addresses):
webui_url = ray_addresses["webui_url"].replace("localhost",
"http://127.0.0.1")
raylet_info = requests.get(webui_url + "/api/raylet_info").json()
actor_info = raylet_info["result"]["actors"]
assert len(actor_info) == 1
_, infeasible_actor_info = actor_info.popitem()
assert infeasible_actor_info["state"] == -1
assert infeasible_actor_info["invalidStateType"] == "infeasibleActor"
assert (wait_until_succeeded_without_exception(
test_infeasible_actor,
(AssertionError, requests.exceptions.ConnectionError),
addresses,
timeout_ms=30000,
retry_interval_ms=1000) is True)
def test_raylet_pending_tasks(shutdown_only):
# Make sure to specify num_cpus. Otherwise, the test can be broken
# when the number of cores is less than the number of spawned actors.
addresses = ray.init(num_gpus=3, num_cpus=4)
@ray.remote(num_gpus=1)
class ActorRequiringGPU:
def __init__(self):
pass
@ray.remote
class ParentActor:
def __init__(self):
self.a = [ActorRequiringGPU.remote() for i in range(4)]
ParentActor.remote()
def test_pending_actor(ray_addresses):
webui_url = ray_addresses["webui_url"].replace("localhost",
"http://127.0.0.1")
raylet_info = requests.get(webui_url + "/api/raylet_info").json()
actor_info = raylet_info["result"]["actors"]
assert len(actor_info) == 1
_, infeasible_actor_info = actor_info.popitem()
# Verify there are 4 spawned actors.
children = infeasible_actor_info["children"]
assert len(children) == 4
pending_actor_detected = 0
for child_id, child in children.items():
if ("invalidStateType" in child
and child["invalidStateType"] == "pendingActor"):
pending_actor_detected += 1
# 4 GPUActors are spawned although there are only 3 GPUs.
# One actor should be in the pending state.
assert pending_actor_detected == 1
assert (wait_until_succeeded_without_exception(
test_pending_actor,
(AssertionError, requests.exceptions.ConnectionError),
addresses,
timeout_ms=30000,
retry_interval_ms=1000) is True)
@pytest.mark.skipif(
os.environ.get("TRAVIS") is None,
reason="This test requires password-less sudo due to py-spy requirement.")

View file

@ -72,6 +72,7 @@ message GetNodeStatsReply {
repeated ViewData view_data = 2;
uint32 num_workers = 3;
repeated TaskSpec infeasible_tasks = 4;
repeated TaskSpec ready_tasks = 5;
}
// Service for inter-node-manager communication.

View file

@ -3132,9 +3132,24 @@ void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
worker_stats->set_pid(driver->Pid());
worker_stats->set_is_driver(true);
}
// NOTE(sang): Currently reporting only infeasible/ready ActorCreationTask
// because Ray dashboard only renders actorCreationTask as of Feb 3 2020.
// TODO(sang): Support dashboard for non-ActorCreationTask.
for (const auto task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
auto infeasible_task = reply->add_infeasible_tasks();
infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize());
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto infeasible_task = reply->add_infeasible_tasks();
infeasible_task->ParseFromString(task.GetTaskSpecification().Serialize());
}
}
// Report tasks that are not scheduled because
// resources are occupied by other actors/tasks.
// NOTE(sang): This solution is a workaround. It can be replaced by creating a new state
// like PENDING_UNTIL_RESOURCE_AVAILABLE.
for (const auto task : local_queues_.GetTasks(TaskState::READY)) {
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto ready_task = reply->add_ready_tasks();
ready_task->ParseFromString(task.GetTaskSpecification().Serialize());
}
}
// Ensure we never report an empty set of metrics.
if (!recorded_metrics_) {