mirror of
https://github.com/vale981/ray
synced 2025-03-05 10:01:43 -05:00
Revert "Revert "[tune] Also interrupt training when SIGUSR1 received"" (#24101)
* Revert "Revert "[tune] Also interrupt training when SIGUSR1 received" (#24085)"
This reverts commit 00595653ed
.
Failure in windows has been addressed by conditionally registering the signal handler if available.
This commit is contained in:
parent
0e2dd40451
commit
bb341eb1e4
4 changed files with 23 additions and 13 deletions
|
@ -9,6 +9,10 @@ If you send a SIGINT signal to the process running ``tune.run()`` (which is
|
|||
usually what happens when you press Ctrl+C in the console), Ray Tune shuts
|
||||
down training gracefully and saves a final experiment-level checkpoint.
|
||||
|
||||
Ray Tune also accepts the SIGUSR1 signal to interrupt training gracefully. This
|
||||
should be used when running Ray Tune in a remote process (e.g. via Ray client)
|
||||
as Ray will filter out SIGINT and SIGTERM signals per default.
|
||||
|
||||
How to resume a Tune run?
|
||||
-------------------------
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ class TuneInterruptionTest(unittest.TestCase):
|
|||
thread.join()
|
||||
|
||||
ray.shutdown()
|
||||
del os.environ["TUNE_DISABLE_SIGINT_HANDLER"]
|
||||
os.environ.pop("TUNE_DISABLE_SIGINT_HANDLER", None)
|
||||
|
||||
|
||||
class TuneFailResumeGridTest(unittest.TestCase):
|
||||
|
|
|
@ -700,27 +700,33 @@ def run(
|
|||
)
|
||||
|
||||
original_handler = signal.getsignal(signal.SIGINT)
|
||||
state = {signal.SIGINT: False}
|
||||
state = {"signal": None}
|
||||
|
||||
def sigint_handler(sig, frame):
|
||||
def signal_interrupt_tune_run(sig: int, frame):
|
||||
logger.warning(
|
||||
"SIGINT received (e.g. via Ctrl+C), ending Ray Tune run. "
|
||||
"Stop signal received (e.g. via SIGINT/Ctrl+C), ending Ray Tune run. "
|
||||
"This will try to checkpoint the experiment state one last time. "
|
||||
"Press CTRL+C one more time (or send SIGINT/SIGKILL/SIGTERM) "
|
||||
"Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) "
|
||||
"to skip. "
|
||||
)
|
||||
state[signal.SIGINT] = True
|
||||
state["signal"] = sig
|
||||
# Restore original signal handler to react to future SIGINT signals
|
||||
signal.signal(signal.SIGINT, original_handler)
|
||||
|
||||
# We should only install the handler when it is safe to do so.
|
||||
# When tune.run() is called from worker thread, signal.signal will
|
||||
# fail.
|
||||
allow_signal_catching = True
|
||||
if threading.current_thread() != threading.main_thread():
|
||||
os.environ["TUNE_DISABLE_SIGINT_HANDLER"] = "1"
|
||||
allow_signal_catching = False
|
||||
|
||||
if not int(os.getenv("TUNE_DISABLE_SIGINT_HANDLER", "0")):
|
||||
signal.signal(signal.SIGINT, sigint_handler)
|
||||
if allow_signal_catching:
|
||||
if not int(os.getenv("TUNE_DISABLE_SIGINT_HANDLER", "0")):
|
||||
signal.signal(signal.SIGINT, signal_interrupt_tune_run)
|
||||
|
||||
# Always register SIGUSR1 if available (not available e.g. on Windows)
|
||||
if hasattr(signal, "SIGUSR1"):
|
||||
signal.signal(signal.SIGUSR1, signal_interrupt_tune_run)
|
||||
|
||||
progress_reporter = progress_reporter or detect_reporter()
|
||||
|
||||
|
@ -732,7 +738,7 @@ def run(
|
|||
metric=metric,
|
||||
mode=mode,
|
||||
)
|
||||
while not runner.is_finished() and not state[signal.SIGINT]:
|
||||
while not runner.is_finished() and not state["signal"]:
|
||||
runner.step()
|
||||
if has_verbosity(Verbosity.V1_EXPERIMENT):
|
||||
_report_progress(runner, progress_reporter)
|
||||
|
@ -755,7 +761,7 @@ def run(
|
|||
incomplete_trials += [trial]
|
||||
|
||||
if incomplete_trials:
|
||||
if raise_on_failed_trial and not state[signal.SIGINT]:
|
||||
if raise_on_failed_trial and not state["signal"]:
|
||||
raise TuneError("Trials did not complete", incomplete_trials)
|
||||
else:
|
||||
logger.error("Trials did not complete: %s", incomplete_trials)
|
||||
|
@ -767,7 +773,7 @@ def run(
|
|||
f"({tune_taken:.2f} seconds for the tuning loop)."
|
||||
)
|
||||
|
||||
if state[signal.SIGINT]:
|
||||
if state["signal"]:
|
||||
logger.warning(
|
||||
"Experiment has been interrupted, but the most recent state was "
|
||||
"saved. You can continue running this experiment by passing "
|
||||
|
|
|
@ -320,7 +320,7 @@ def run_tune_script_for_time(
|
|||
# Wait until indicator file exists
|
||||
wait_for_run_or_raise(process, indicator_file=indicator_file, timeout=30)
|
||||
# Stop experiment (with checkpoint) after some time
|
||||
send_signal_after_wait(process, signal=signal.SIGINT, wait=run_time)
|
||||
send_signal_after_wait(process, signal=signal.SIGUSR1, wait=run_time)
|
||||
# Wait until process gracefully terminated
|
||||
wait_until_process_terminated(process, timeout=45)
|
||||
finally:
|
||||
|
|
Loading…
Add table
Reference in a new issue