From 814b49356c1e773f2ec8d7c643acef48b6db08cf Mon Sep 17 00:00:00 2001 From: xwjiang2010 <87673679+xwjiang2010@users.noreply.github.com> Date: Wed, 16 Mar 2022 20:55:30 -0700 Subject: [PATCH] [tuner] Tuner impl. (#22848) --- .../ray/tune/analysis/experiment_analysis.py | 4 + python/ray/tune/experiment.py | 151 ++++++++++---- python/ray/tune/impl/__init__.py | 0 .../tune/impl/dataset_execution_registry.py | 24 +++ .../impl/out_of_band_serialize_dataset.py | 34 ++++ python/ray/tune/impl/test_utils.py | 65 ++++++ python/ray/tune/impl/tuner_internal.py | 192 ++++++++++++++++++ python/ray/tune/impl/utils.py | 28 +++ python/ray/tune/result_grid.py | 1 - python/ray/tune/trial_runner.py | 4 +- python/ray/tune/tune.py | 3 + python/ray/tune/tuner.py | 97 ++++++++- 12 files changed, 555 insertions(+), 48 deletions(-) create mode 100644 python/ray/tune/impl/__init__.py create mode 100644 python/ray/tune/impl/dataset_execution_registry.py create mode 100644 python/ray/tune/impl/out_of_band_serialize_dataset.py create mode 100644 python/ray/tune/impl/test_utils.py create mode 100644 python/ray/tune/impl/tuner_internal.py create mode 100644 python/ray/tune/impl/utils.py diff --git a/python/ray/tune/analysis/experiment_analysis.py b/python/ray/tune/analysis/experiment_analysis.py index 83d2359ba..0f4c2194f 100644 --- a/python/ray/tune/analysis/experiment_analysis.py +++ b/python/ray/tune/analysis/experiment_analysis.py @@ -535,6 +535,9 @@ class ExperimentAnalysis: values are disregarded and these trials are never selected as the best trial. """ + if len(self.trials) == 1: + return self.trials[0] + metric = self._validate_metric(metric) mode = self._validate_mode(mode) @@ -550,6 +553,7 @@ class ExperimentAnalysis: ) best_trial = None best_metric_score = None + for trial in self.trials: if metric not in trial.metric_analysis: continue diff --git a/python/ray/tune/experiment.py b/python/ray/tune/experiment.py index 6d0708b5c..1c37f7cd5 100644 --- a/python/ray/tune/experiment.py +++ b/python/ray/tune/experiment.py @@ -4,9 +4,10 @@ import grpc import inspect import logging import os +from pathlib import Path from pickle import PicklingError import traceback -from typing import Dict, Sequence, Any +from typing import Any, Dict, Optional, Sequence from ray.tune.error import TuneError from ray.tune.registry import register_trainable @@ -49,6 +50,24 @@ def _validate_log_to_file(log_to_file): return stdout_file, stderr_file +def _get_local_dir_with_expand_user(local_dir: Optional[str]) -> str: + return os.path.abspath(os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR)) + + +def _get_dir_name(run, explicit_name: Optional[str], combined_name: str) -> str: + # If the name has been set explicitly, we don't want to create + # dated directories. The same is true for string run identifiers. + if ( + int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0)) == 1 + or explicit_name + or isinstance(run, str) + ): + dir_name = combined_name + else: + dir_name = "{}_{}".format(combined_name, date_str()) + return dir_name + + @DeveloperAPI class Experiment: """Tracks experiment specifications. @@ -74,6 +93,12 @@ class Experiment: local_dir="~/ray_results", checkpoint_freq=10, max_failures=2) + + Args: + TODO(xwjiang): Add the whole list. + _experiment_checkpoint_dir: Internal use only. If present, use this + as the root directory for experiment checkpoint. If not present, + the directory path will be deduced from trainable name instead. """ # Keys that will be present in `public_spec` dict. @@ -89,6 +114,7 @@ class Experiment: resources_per_trial=None, num_samples=1, local_dir=None, + _experiment_checkpoint_dir: Optional[str] = None, sync_config=None, trial_name_creator=None, trial_dirname_creator=None, @@ -102,6 +128,18 @@ class Experiment: restore=None, ): + local_dir = _get_local_dir_with_expand_user(local_dir) + # `_experiment_checkpoint_dir` is for internal use only for better + # support of Tuner API. + # If set, it should be a subpath under `local_dir`. Also deduce `dir_name`. + self._experiment_checkpoint_dir = _experiment_checkpoint_dir + if _experiment_checkpoint_dir: + experiment_checkpoint_dir_path = Path(_experiment_checkpoint_dir) + local_dir_path = Path(local_dir) + assert local_dir_path in experiment_checkpoint_dir_path.parents + # `dir_name` is set by `_experiment_checkpoint_dir` indirectly. + self.dir_name = os.path.relpath(_experiment_checkpoint_dir, local_dir) + config = config or {} sync_config = sync_config or SyncConfig() if ( @@ -139,16 +177,10 @@ class Experiment: self.name = name or self._run_identifier - # If the name has been set explicitly, we don't want to create - # dated directories. The same is true for string run identifiers. - if ( - int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0)) == 1 - or name - or isinstance(run, str) - ): - self.dir_name = self.name - else: - self.dir_name = "{}_{}".format(self.name, date_str()) + if not _experiment_checkpoint_dir: + self.dir_name = _get_dir_name(run, name, self.name) + + assert self.dir_name if sync_config.upload_dir: self.remote_checkpoint_dir = os.path.join( @@ -207,9 +239,7 @@ class Experiment: "config": config, "resources_per_trial": resources_per_trial, "num_samples": num_samples, - "local_dir": os.path.abspath( - os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR) - ), + "local_dir": local_dir, "sync_config": sync_config, "remote_checkpoint_dir": self.remote_checkpoint_dir, "trial_name_creator": trial_name_creator, @@ -258,25 +288,21 @@ class Experiment: return exp @classmethod - def register_if_needed(cls, run_object): - """Registers Trainable or Function at runtime. + def get_trainable_name(cls, run_object): + """Get Trainable name. - Assumes already registered if run_object is a string. - Also, does not inspect interface of given run_object. - - Arguments: + Args: run_object (str|function|class): Trainable to run. If string, assumes it is an ID and does not modify it. Otherwise, returns a string corresponding to the run_object name. Returns: A string representing the trainable identifier. - """ - if isinstance(run_object, str): - return run_object - elif isinstance(run_object, Domain): - logger.warning("Not registering trainable. Resolving as variant.") + Raises: + TuneError: if ``run_object`` passed in is invalid. + """ + if isinstance(run_object, str) or isinstance(run_object, Domain): return run_object elif isinstance(run_object, type) or callable(run_object): name = "DEFAULT" @@ -298,21 +324,67 @@ class Experiment: name = run_object.func.__name__ else: logger.warning("No name detected on trainable. Using {}.".format(name)) - try: - register_trainable(name, run_object) - except (TypeError, PicklingError) as e: - extra_msg = ( - "Other options: " - "\n-Try reproducing the issue by calling " - "`pickle.dumps(trainable)`. " - "\n-If the error is typing-related, try removing " - "the type annotations and try again." - ) - raise type(e)(str(e) + " " + extra_msg) from None return name else: raise TuneError("Improper 'run' - not string nor trainable.") + @classmethod + def register_if_needed(cls, run_object): + """Registers Trainable or Function at runtime. + + Assumes already registered if run_object is a string. + Also, does not inspect interface of given run_object. + + Args: + run_object (str|function|class): Trainable to run. If string, + assumes it is an ID and does not modify it. Otherwise, + returns a string corresponding to the run_object name. + + Returns: + A string representing the trainable identifier. + """ + if isinstance(run_object, str): + return run_object + elif isinstance(run_object, Domain): + logger.warning("Not registering trainable. Resolving as variant.") + return run_object + name = cls.get_trainable_name(run_object) + try: + register_trainable(name, run_object) + except (TypeError, PicklingError) as e: + extra_msg = ( + "Other options: " + "\n-Try reproducing the issue by calling " + "`pickle.dumps(trainable)`. " + "\n-If the error is typing-related, try removing " + "the type annotations and try again." + ) + raise type(e)(str(e) + " " + extra_msg) from None + return name + + @classmethod + def get_experiment_checkpoint_dir(cls, run_obj, local_dir=None, name=None): + """Get experiment checkpoint dir without setting up an experiment. + + This is only used internally for better support of Tuner API. + + Args: + run_obj (str|function|class): Trainable to run. + name (str): The name of the experiment specified by user. + local_dir (str): The local_dir path. + + Returns: + Checkpoint directory for experiment. + """ + assert run_obj + local_dir = _get_local_dir_with_expand_user(local_dir) + run_identifier = cls.get_trainable_name(run_obj) + combined_name = name or run_identifier + + dir_name = _get_dir_name(run_obj, name, combined_name) + + return os.path.join(local_dir, dir_name) + @property def stopper(self): return self._stopper @@ -323,8 +395,11 @@ class Experiment: @property def checkpoint_dir(self): - if self.local_dir: - return os.path.join(self.local_dir, self.dir_name) + # Provided when initializing Experiment, if so, return directly. + if self._experiment_checkpoint_dir: + return self._experiment_checkpoint_dir + assert self.local_dir + return os.path.join(self.local_dir, self.dir_name) @property def run_identifier(self): diff --git a/python/ray/tune/impl/__init__.py b/python/ray/tune/impl/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/tune/impl/dataset_execution_registry.py b/python/ray/tune/impl/dataset_execution_registry.py new file mode 100644 index 000000000..c65ee364c --- /dev/null +++ b/python/ray/tune/impl/dataset_execution_registry.py @@ -0,0 +1,24 @@ +from ray.data import Dataset + + +class DatasetExecutionRegistry: + """A class that makes sure each dataset is executed only once. + + Tune's driver process needs to make sure that dataset is fully executed + before sent to trials. Multiple trials can all use the same dataset. + In such case, the dataset should only be executed once (instead of once + per trial). + + This class is only used when resuming trials from checkpoints.""" + + def __init__(self): + self.set = set() + + def execute_if_needed(self, ds: Dataset) -> Dataset: + if ds._uuid not in self.set: + ds = ds.fully_executed() + self.set.add(ds._uuid) + return ds + + +dataset_execution_registry = DatasetExecutionRegistry() diff --git a/python/ray/tune/impl/out_of_band_serialize_dataset.py b/python/ray/tune/impl/out_of_band_serialize_dataset.py new file mode 100644 index 000000000..f546fe0a5 --- /dev/null +++ b/python/ray/tune/impl/out_of_band_serialize_dataset.py @@ -0,0 +1,34 @@ +import contextlib +import traceback + +import ray +from ray.tune.impl.dataset_execution_registry import dataset_execution_registry + + +def _deserialize_and_fully_execute_if_needed(serialized_ds: bytes): + ds = ray.data.Dataset.deserialize_out_of_band(serialized_ds) + return dataset_execution_registry.execute_if_needed(ds) + + +def _reduce(ds: ray.data.Dataset): + tb_list = traceback.format_list(traceback.extract_stack()) + _already_in_out_of_band_serialization = False + for tb in tb_list: + # TODO(xwjiang): Let's make this less hacky. + if "serialize_out_of_band" in tb: + _already_in_out_of_band_serialization = True + break + if not _already_in_out_of_band_serialization: + return _deserialize_and_fully_execute_if_needed, (ds.serialize_out_of_band(),) + else: + return ds.__reduce__() + + +@contextlib.contextmanager +def out_of_band_serialize_dataset(): + context = ray.worker.global_worker.get_serialization_context() + try: + context._register_cloudpickle_reducer(ray.data.Dataset, _reduce) + yield + finally: + context._unregister_cloudpickle_reducer(ray.data.Dataset) diff --git a/python/ray/tune/impl/test_utils.py b/python/ray/tune/impl/test_utils.py new file mode 100644 index 000000000..0cd9b85c6 --- /dev/null +++ b/python/ray/tune/impl/test_utils.py @@ -0,0 +1,65 @@ +from sklearn.datasets import load_breast_cancer + +from ray import tune +from ray.data import read_datasource, Dataset, Datasource, ReadTask +from ray.data.block import BlockMetadata +from ray.tune.impl.utils import execute_dataset + + +# TODO(xwjiang): Enable this when Clark's out-of-band-serialization is landed. +class TestDatasource(Datasource): + def prepare_read(self, parallelism: int, **read_args): + import pyarrow as pa + + def load_data(): + data_raw = load_breast_cancer(as_frame=True) + dataset_df = data_raw["data"] + dataset_df["target"] = data_raw["target"] + return [pa.Table.from_pandas(dataset_df)] + + meta = BlockMetadata( + num_rows=None, + size_bytes=None, + schema=None, + input_files=None, + exec_stats=None, + ) + return [ReadTask(load_data, meta)] + + +def gen_dataset_func() -> Dataset: + test_datasource = TestDatasource() + return read_datasource(test_datasource) + + +def test_grid_search(): + ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x) + ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x) + assert not ds1._plan._has_final_stage_snapshot() + assert not ds2._plan._has_final_stage_snapshot() + param_space = {"train_dataset": tune.grid_search([ds1, ds2])} + execute_dataset(param_space) + executed_ds = param_space["train_dataset"]["grid_search"] + assert len(executed_ds) == 2 + assert executed_ds[0]._plan._has_final_stage_snapshot() + assert executed_ds[1]._plan._has_final_stage_snapshot() + + +def test_choice(): + ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x) + ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x) + assert not ds1._plan._has_final_stage_snapshot() + assert not ds2._plan._has_final_stage_snapshot() + param_space = {"train_dataset": tune.choice([ds1, ds2])} + execute_dataset(param_space) + executed_ds = param_space["train_dataset"].categories + assert len(executed_ds) == 2 + assert executed_ds[0]._plan._has_final_stage_snapshot() + assert executed_ds[1]._plan._has_final_stage_snapshot() + + +if __name__ == "__main__": + import pytest + import sys + + sys.exit(pytest.main(["-v", "-x", __file__])) diff --git a/python/ray/tune/impl/tuner_internal.py b/python/ray/tune/impl/tuner_internal.py new file mode 100644 index 000000000..54f90bd2e --- /dev/null +++ b/python/ray/tune/impl/tuner_internal.py @@ -0,0 +1,192 @@ +import copy +import os +from typing import Any, Callable, Dict, Optional, Type, Union + +import ray.cloudpickle as pickle +from ray.ml.config import RunConfig +from ray.ml.trainer import Trainer +from ray.tune import Experiment, TuneError, ExperimentAnalysis +from ray.tune.impl.utils import execute_dataset +from ray.tune.result_grid import ResultGrid +from ray.tune.trainable import Trainable +from ray.tune.tune import run +from ray.tune.tune_config import TuneConfig + + +_TRAINABLE_PKL = "trainable.pkl" +_TUNER_PKL = "tuner.pkl" +_TRAINABLE_KEY = "_trainable" +_PARAM_SPACE_KEY = "_param_space" + + +class TunerInternal: + """The real implementation behind external facing ``Tuner``. + + The external facing ``Tuner`` multiplexes between local Tuner and remote Tuner + depending on whether in Ray client mode. + + In Ray client mode, external ``Tuner`` wraps ``TunerInternal`` into a remote actor, + which is guaranteed to be placed on head node. + + ``TunerInternal`` can be constructed from fresh, in which case, ``trainable`` needs + to be provided, together with optional ``param_space``, ``tune_config`` and + ``run_config``. + + It can also be restored from a previous failed run (given ``restore_path``). + + Args: + restore_path: The path from where the Tuner can be restored. If provided, None + of the rest args are needed. + trainable: The trainable to be tuned. + param_space: Search space of the tuning job. + One thing to note is that both preprocessor and dataset can be tuned here. + tune_config: Tuning algorithm specific configs. + Refer to ray.tune.tune_config.TuneConfig for more info. + run_config: Runtime configuration that is specific to individual trials. + Refer to ray.ml.config.RunConfig for more info. + """ + + def __init__( + self, + restore_path: str = None, + trainable: Optional[ + Union[ + str, + Callable, + Type[Trainable], + Type[Trainer], + Trainer, + ] + ] = None, + param_space: Optional[Dict[str, Any]] = None, + tune_config: Optional[TuneConfig] = None, + run_config: Optional[RunConfig] = None, + ): + # Restored from Tuner checkpoint. + if restore_path: + trainable_ckpt = os.path.join(restore_path, _TRAINABLE_PKL) + with open(trainable_ckpt, "rb") as fp: + trainable = pickle.load(fp) + + tuner_ckpt = os.path.join(restore_path, _TUNER_PKL) + with open(tuner_ckpt, "rb") as fp: + tuner = pickle.load(fp) + self.__dict__.update(tuner.__dict__) + + self._is_restored = True + self._trainable = trainable + self._experiment_checkpoint_dir = restore_path + return + + # Start from fresh + if not trainable: + raise TuneError("You need to provide a trainable to tune.") + + self._is_restored = False + self._trainable = trainable + self._tune_config = tune_config + self._run_config = run_config + self._experiment_checkpoint_dir = self._setup_create_experiment_checkpoint_dir( + self._run_config + ) + + # Not used for restored Tuner. + self._param_space = param_space + self._process_dataset_param() + + # This needs to happen before `tune.run()` is kicked in. + # This is because currently tune does not exit gracefully if + # run in ray client mode - if crash happens, it just exits immediately + # without allowing for checkpointing tuner and trainable. + # Thus this has to happen before tune.run() so that we can have something + # to restore from. + tuner_ckpt = os.path.join(self._experiment_checkpoint_dir, _TUNER_PKL) + with open(tuner_ckpt, "wb") as fp: + pickle.dump(self, fp) + + trainable_ckpt = os.path.join(self._experiment_checkpoint_dir, _TRAINABLE_PKL) + with open(trainable_ckpt, "wb") as fp: + pickle.dump(self._trainable, fp) + + def _process_dataset_param(self) -> None: + """Dataset needs to be fully executed before sent over to trainables. + + A valid dataset configuration in param space looks like: + "datasets": { + "train_dataset": tune.grid_search([ds1, ds2]), + }, + """ + execute_dataset(self._param_space) + + def _setup_create_experiment_checkpoint_dir( + self, run_config: Optional[RunConfig] + ) -> str: + """Sets up experiment checkpoint dir before actually running the experiment.""" + path = Experiment.get_experiment_checkpoint_dir( + self._convert_trainable(self._trainable), + run_config.local_dir, + run_config.name, + ) + if not os.path.exists(path): + os.makedirs(path) + return path + + # This has to be done through a function signature (@property won't do). + def experiment_checkpoint_dir(self) -> str: + return self._experiment_checkpoint_dir + + @staticmethod + def _convert_trainable(trainable: Any) -> Type[Trainable]: + if isinstance(trainable, Trainer): + trainable = trainable.as_trainable() + else: + trainable = trainable + return trainable + + def fit(self) -> ResultGrid: + trainable = self._convert_trainable(self._trainable) + assert self._experiment_checkpoint_dir + if not self._is_restored: + param_space = copy.deepcopy(self._param_space) + analysis = self._fit_internal(trainable, param_space) + else: + analysis = self._fit_resume(trainable) + analysis._legacy_checkpoint = False + return ResultGrid(analysis) + + def _fit_internal(self, trainable, param_space) -> ExperimentAnalysis: + """Fitting for a fresh Tuner.""" + analysis = run( + trainable, + config={**param_space}, + mode=self._tune_config.mode, + metric=self._tune_config.metric, + num_samples=self._tune_config.num_samples, + search_alg=self._tune_config.search_alg, + scheduler=self._tune_config.scheduler, + name=self._run_config.name, + callbacks=self._run_config.callbacks, + _experiment_checkpoint_dir=self._experiment_checkpoint_dir, + ) + return analysis + + def _fit_resume(self, trainable) -> ExperimentAnalysis: + """Fitting for a restored Tuner.""" + analysis = run( + trainable, + resume=True, + mode=self._tune_config.mode, + metric=self._tune_config.metric, + callbacks=self._run_config.callbacks, + _experiment_checkpoint_dir=self._experiment_checkpoint_dir, + ) + return analysis + + def __getstate__(self): + state = self.__dict__.copy() + state.pop(_TRAINABLE_KEY, None) + state.pop(_PARAM_SPACE_KEY, None) + return state + + def __setstate__(self, state): + self.__dict__.update(state) diff --git a/python/ray/tune/impl/utils.py b/python/ray/tune/impl/utils.py new file mode 100644 index 000000000..12347d255 --- /dev/null +++ b/python/ray/tune/impl/utils.py @@ -0,0 +1,28 @@ +from ray.data import Dataset +from ray.tune.sample import Categorical + + +def execute_dataset(config_dict: dict): + """Going through config dict (params space) and fully execute any Dataset + if necessary. + """ + for k, v in config_dict.items(): + if isinstance(v, dict): + execute_dataset(v) + elif isinstance(v, Dataset): + config_dict[k] = v.fully_executed() + # TODO(xwjiang): Consider CV config for beta. + # elif isinstance(v, int): + # # CV settings + # pass + elif isinstance(v, list) or isinstance(v, Categorical): + _list = v if isinstance(v, list) else v.categories + if len(_list) == 0: + return + if isinstance(_list[0], Dataset): + if isinstance(v, list): + config_dict[k] = [_item.fully_executed() for _item in _list] + else: # Categorical + config_dict[k].categories = [ + _item.fully_executed() for _item in _list + ] diff --git a/python/ray/tune/result_grid.py b/python/ray/tune/result_grid.py index dc04ed49a..c114b9b75 100644 --- a/python/ray/tune/result_grid.py +++ b/python/ray/tune/result_grid.py @@ -62,7 +62,6 @@ class ResultGrid: return None def _trial_to_result(self, trial: Trial) -> Result: - # TODO(xwjiang): Use Kai's new checkpoint! result = Result( checkpoint=trial.checkpoint, metrics=trial.last_result, diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 62a7cb3ee..8060d29fb 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -10,6 +10,7 @@ import traceback import warnings import ray +from ray.tune.impl.out_of_band_serialize_dataset import out_of_band_serialize_dataset from ray.util import get_node_ip_address from ray.tune import TuneError from ray.tune.callback import CallbackList @@ -160,7 +161,8 @@ class _ExperimentCheckpointManager: search_alg.save_to_dir(self._checkpoint_dir, session_str=self._session_str) checkpoint_time_start = time.monotonic() - _serialize_and_write() + with out_of_band_serialize_dataset(): + _serialize_and_write() if self._sync_trial_checkpoints: exclude = None diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index 80ee13861..89d9131f2 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -121,6 +121,8 @@ def run( raise_on_failed_trial: bool = True, callbacks: Optional[Sequence[Callback]] = None, max_concurrent_trials: Optional[int] = None, + # == internal only == + _experiment_checkpoint_dir: Optional[str] = None, # Deprecated args queue_trials: Optional[bool] = None, loggers: Optional[Sequence[Type[Logger]]] = None, @@ -474,6 +476,7 @@ def run( resources_per_trial=resources_per_trial, num_samples=num_samples, local_dir=local_dir, + _experiment_checkpoint_dir=_experiment_checkpoint_dir, sync_config=sync_config, trial_name_creator=trial_name_creator, trial_dirname_creator=trial_dirname_creator, diff --git a/python/ray/tune/tuner.py b/python/ray/tune/tuner.py index 4031bcd8c..e9da36892 100644 --- a/python/ray/tune/tuner.py +++ b/python/ray/tune/tuner.py @@ -1,12 +1,29 @@ +from typing import Any, Callable, Dict, Optional, Type, Union + +import ray + +from ray.ml.config import RunConfig +from ray.ml.trainer import Trainer +from ray.tune import TuneError from ray.tune.result_grid import ResultGrid +from ray.tune.trainable import Trainable +from ray.tune.impl.tuner_internal import TunerInternal +from ray.tune.tune_config import TuneConfig from ray.util import PublicAPI +from ray.util.client.common import ClientActorHandle +from ray.util.ml_utils.node import force_on_current_node + + +# The magic key that is used when instantiating Tuner during resume. +_TUNER_INTERNAL = "_tuner_internal" +_SELF = "self" @PublicAPI(stability="alpha") class Tuner: """Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune. - Attributes: + Args: trainable: The trainable to be tuned. param_space: Search space of the tuning job. One thing to note is that both preprocessor and dataset can be tuned here. @@ -15,9 +32,6 @@ class Tuner: run_config: Runtime configuration that is specific to individual trials. Refer to ray.ml.config.RunConfig for more info. - Returns: - ``ResultGrid`` object. - Usage pattern: .. code-block:: python @@ -52,10 +66,49 @@ class Tuner: tuner = Tuner.restore(experiment_checkpoint_dir) tuner.fit() - `experiment_checkpoint_dir` can be easily located near the end of the + ``experiment_checkpoint_dir`` can be easily located near the end of the console output of your first failed run. """ + # One of the following is assigned. + _local_tuner: Optional[TunerInternal] # Only used in none ray client mode. + _remote_tuner: Optional[ClientActorHandle] # Only used in ray client mode. + + def __init__( + self, + trainable: Optional[ + Union[ + str, + Callable, + Type[Trainable], + Type[Trainer], + Trainer, + ] + ] = None, + param_space: Optional[Dict[str, Any]] = None, + tune_config: Optional[TuneConfig] = None, + run_config: Optional[RunConfig] = None, + # This is internal only arg. + _tuner_internal: Optional[TunerInternal] = None, + ): + """Configure and construct a tune run.""" + kwargs = locals().copy() + self._is_ray_client = ray.util.client.ray.is_connected() + if _tuner_internal: + if not self._is_ray_client: + self._local_tuner = kwargs[_TUNER_INTERNAL] + else: + self._remote_tuner = kwargs[_TUNER_INTERNAL] + else: + kwargs.pop(_TUNER_INTERNAL, None) + kwargs.pop(_SELF, None) + if not self._is_ray_client: + self._local_tuner = TunerInternal(**kwargs) + else: + self._remote_tuner = force_on_current_node( + ray.remote(num_cpus=0)(TunerInternal) + ).remote(**kwargs) + @classmethod def restore(cls, path: str) -> "Tuner": """Restores Tuner after a previously failed run. @@ -71,7 +124,14 @@ class Tuner: # retored runs. # For example, is callbacks supposed to be automatically applied # when a Tuner is restored and fit again? - raise NotImplementedError + if not ray.util.client.ray.is_connected(): + tuner_internal = TunerInternal(restore_path=path) + return Tuner(tuner_internal=tuner_internal) + else: + tuner_internal = force_on_current_node( + ray.remote(num_cpus=0)(TunerInternal) + ).remote(restore_path=path) + return Tuner(tuner_internal=tuner_internal) def fit(self) -> ResultGrid: """Executes hyperparameter tuning job as configured and returns result. @@ -80,7 +140,7 @@ class Tuner: For the kind of exception that happens during the execution of a trial, one may inspect it together with stacktrace through the returned result grid. See ``ResultGrid`` for reference. Each trial may fail up to a certain number. - This is configured by `RunConfig.FailureConfig.max_failures`. + This is configured by ``RunConfig.FailureConfig.max_failures``. Exception that happens beyond trials will be thrown by this method as well. In such cases, there will be instruction like the following printed out @@ -96,4 +156,25 @@ class Tuner: TuneError: If errors occur executing the experiment that originate from Tune. """ - raise NotImplementedError + + if not self._is_ray_client: + try: + return self._local_tuner.fit() + except Exception as e: + raise TuneError( + f"Tune run failed." + f'Please use tuner = Tuner.restore("' + f'{self._local_tuner.experiment_checkpoint_dir}") to resume.' + ) from e + else: + experiment_checkpoint_dir = ray.get( + self._remote_tuner.experiment_checkpoint_dir.remote() + ) + try: + return ray.get(self._remote_tuner.fit.remote()) + except Exception as e: + raise TuneError( + f"Tune run failed." + f'Please use tuner = Tuner.restore("' + f'{experiment_checkpoint_dir}") to resume.' + ) from e