Pin _StatsActor to the driver node (#27765)

Similar to what's done in #23397

This allows the actor to fate-share with the driver and tolerate worker node failures.
This commit is contained in:
Jiajun Yao 2022-08-10 17:55:06 -07:00 committed by GitHub
parent ddca52d2ca
commit 27e38f81bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 7 deletions

View file

@ -9,6 +9,7 @@ import ray
from ray.data._internal.block_list import BlockList
from ray.data.block import BlockMetadata
from ray.data.context import DatasetContext
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
def fmt(seconds: float) -> str:
@ -123,10 +124,19 @@ def _get_or_create_stats_actor():
or _stats_actor[1] != ray.get_runtime_context().job_id.hex()
):
ctx = DatasetContext.get_current()
scheduling_strategy = ctx.scheduling_strategy
if not ray.util.client.ray.is_connected():
# Pin the stats actor to the local node
# so it fate-shares with the driver.
scheduling_strategy = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=False,
)
_stats_actor[0] = _StatsActor.options(
name="datasets_stats_actor",
get_if_exists=True,
scheduling_strategy=ctx.scheduling_strategy,
scheduling_strategy=scheduling_strategy,
).remote()
_stats_actor[1] = ray.get_runtime_context().job_id.hex()

View file

@ -37,6 +37,7 @@ from ray.data.datasource.file_based_datasource import (
from ray.data.row import TableRow
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
if TYPE_CHECKING:
import pandas
@ -298,18 +299,19 @@ class DatasetPipeline(Generic[T]):
def _split(
self, n: int, splitter: Callable[[Dataset], List["Dataset[T]"]]
) -> List["DatasetPipeline[T]"]:
resources = {}
ctx = DatasetContext.get_current()
scheduling_strategy = ctx.scheduling_strategy
if not ray.util.client.ray.is_connected():
# Pin the coordinator (and any child actors) to the local node to avoid
# errors during node failures. If the local node dies, then the driver
# will fate-share with the coordinator anyway.
resources["node:{}".format(ray.util.get_node_ip_address())] = 0.0001
ctx = DatasetContext.get_current()
scheduling_strategy = NodeAffinitySchedulingStrategy(
ray.get_runtime_context().get_node_id(),
soft=False,
)
coordinator = PipelineSplitExecutorCoordinator.options(
resources=resources,
scheduling_strategy=ctx.scheduling_strategy,
scheduling_strategy=scheduling_strategy,
).remote(self, n, splitter, DatasetContext.get_current())
if self._executed[0]:
raise RuntimeError("Pipeline cannot be read multiple times.")