mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Co-authored-by: Dmitri Gekhtman <62982571+DmitriGekhtman@users.noreply.github.com>
This commit is contained in:
parent
94b170256f
commit
fec30a25db
3 changed files with 28 additions and 16 deletions
|
@ -10,8 +10,7 @@ import time
|
|||
import traceback
|
||||
import json
|
||||
from multiprocessing.synchronize import Event
|
||||
from typing import Optional
|
||||
|
||||
from typing import Any, Callable, Dict, Optional, Union
|
||||
try:
|
||||
import prometheus_client
|
||||
except ImportError:
|
||||
|
@ -131,13 +130,16 @@ class Monitor:
|
|||
redis: A connection to the Redis server.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
address,
|
||||
autoscaling_config,
|
||||
redis_password=None,
|
||||
prefix_cluster_info=False,
|
||||
monitor_ip=None,
|
||||
stop_event: Optional[Event] = None):
|
||||
def __init__(
|
||||
self,
|
||||
address: str,
|
||||
autoscaling_config: Union[str, Callable[[], Dict[str, Any]]],
|
||||
redis_password: Optional[str] = None,
|
||||
prefix_cluster_info: bool = False,
|
||||
monitor_ip: Optional[str] = None,
|
||||
stop_event: Optional[Event] = None,
|
||||
retry_on_failure: bool = True,
|
||||
):
|
||||
if not use_gcs_for_bootstrap():
|
||||
# Initialize the Redis clients.
|
||||
redis_address = address
|
||||
|
@ -196,6 +198,7 @@ class Monitor:
|
|||
self.prefix_cluster_info = prefix_cluster_info
|
||||
# Can be used to signal graceful exit from monitor loop.
|
||||
self.stop_event = stop_event # type: Optional[Event]
|
||||
self.retry_on_failure = retry_on_failure
|
||||
self.autoscaling_config = autoscaling_config
|
||||
self.autoscaler = None
|
||||
# If set, we are in a manually created cluster (non-autoscaling) and
|
||||
|
@ -366,8 +369,12 @@ class Monitor:
|
|||
as_json,
|
||||
overwrite=True)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Monitor: Execution exception. Trying again...")
|
||||
# By default, do not exit the monitor on failure.
|
||||
if self.retry_on_failure:
|
||||
logger.exception(
|
||||
"Monitor: Execution exception. Trying again...")
|
||||
else:
|
||||
raise
|
||||
|
||||
# Wait for a autoscaler update interval before processing the next
|
||||
# round of messages.
|
||||
|
|
|
@ -113,6 +113,7 @@ class RayCluster:
|
|||
redis_password=ray_constants.REDIS_DEFAULT_PASSWORD,
|
||||
prefix_cluster_info=True,
|
||||
stop_event=self.monitor_stop_event,
|
||||
retry_on_failure=False,
|
||||
)
|
||||
mtr.run()
|
||||
|
||||
|
|
|
@ -233,11 +233,15 @@ class KubernetesOperatorTest(unittest.TestCase):
|
|||
pod_spec["containers"][0]["image"] = IMAGE
|
||||
pod_spec["containers"][0]["imagePullPolicy"] = PULL_POLICY
|
||||
|
||||
# Use a custom Redis port for one of the clusters.
|
||||
example_cluster_config["spec"]["headStartRayCommands"][1] += \
|
||||
" --port 6400"
|
||||
example_cluster_config["spec"]["workerStartRayCommands"][1] = \
|
||||
" ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6400"
|
||||
# Use a custom port for one of the clusters.
|
||||
new_head_start_cmd = example_cluster_config["spec"][
|
||||
"headStartRayCommands"][1].replace("6379", "6380")
|
||||
new_worker_start_cmd = example_cluster_config["spec"][
|
||||
"workerStartRayCommands"][1].replace("6379", "6380")
|
||||
example_cluster_config["spec"]["headStartRayCommands"][
|
||||
1] = new_head_start_cmd
|
||||
example_cluster_config["spec"]["workerStartRayCommands"][
|
||||
1] = new_worker_start_cmd
|
||||
|
||||
# Dump to temporary files
|
||||
yaml.dump(example_cluster_config, example_cluster_file)
|
||||
|
|
Loading…
Add table
Reference in a new issue