[tuner] Tuner impl. (#22848)

This commit is contained in:
xwjiang2010 2022-03-16 20:55:30 -07:00 committed by GitHub
parent 83986a4d83
commit 814b49356c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 555 additions and 48 deletions

View file

@ -535,6 +535,9 @@ class ExperimentAnalysis:
values are disregarded and these trials are never selected as
the best trial.
"""
if len(self.trials) == 1:
return self.trials[0]
metric = self._validate_metric(metric)
mode = self._validate_mode(mode)
@ -550,6 +553,7 @@ class ExperimentAnalysis:
)
best_trial = None
best_metric_score = None
for trial in self.trials:
if metric not in trial.metric_analysis:
continue

View file

@ -4,9 +4,10 @@ import grpc
import inspect
import logging
import os
from pathlib import Path
from pickle import PicklingError
import traceback
from typing import Dict, Sequence, Any
from typing import Any, Dict, Optional, Sequence
from ray.tune.error import TuneError
from ray.tune.registry import register_trainable
@ -49,6 +50,24 @@ def _validate_log_to_file(log_to_file):
return stdout_file, stderr_file
def _get_local_dir_with_expand_user(local_dir: Optional[str]) -> str:
return os.path.abspath(os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR))
def _get_dir_name(run, explicit_name: Optional[str], combined_name: str) -> str:
# If the name has been set explicitly, we don't want to create
# dated directories. The same is true for string run identifiers.
if (
int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0)) == 1
or explicit_name
or isinstance(run, str)
):
dir_name = combined_name
else:
dir_name = "{}_{}".format(combined_name, date_str())
return dir_name
@DeveloperAPI
class Experiment:
"""Tracks experiment specifications.
@ -74,6 +93,12 @@ class Experiment:
local_dir="~/ray_results",
checkpoint_freq=10,
max_failures=2)
Args:
TODO(xwjiang): Add the whole list.
_experiment_checkpoint_dir: Internal use only. If present, use this
as the root directory for experiment checkpoint. If not present,
the directory path will be deduced from trainable name instead.
"""
# Keys that will be present in `public_spec` dict.
@ -89,6 +114,7 @@ class Experiment:
resources_per_trial=None,
num_samples=1,
local_dir=None,
_experiment_checkpoint_dir: Optional[str] = None,
sync_config=None,
trial_name_creator=None,
trial_dirname_creator=None,
@ -102,6 +128,18 @@ class Experiment:
restore=None,
):
local_dir = _get_local_dir_with_expand_user(local_dir)
# `_experiment_checkpoint_dir` is for internal use only for better
# support of Tuner API.
# If set, it should be a subpath under `local_dir`. Also deduce `dir_name`.
self._experiment_checkpoint_dir = _experiment_checkpoint_dir
if _experiment_checkpoint_dir:
experiment_checkpoint_dir_path = Path(_experiment_checkpoint_dir)
local_dir_path = Path(local_dir)
assert local_dir_path in experiment_checkpoint_dir_path.parents
# `dir_name` is set by `_experiment_checkpoint_dir` indirectly.
self.dir_name = os.path.relpath(_experiment_checkpoint_dir, local_dir)
config = config or {}
sync_config = sync_config or SyncConfig()
if (
@ -139,16 +177,10 @@ class Experiment:
self.name = name or self._run_identifier
# If the name has been set explicitly, we don't want to create
# dated directories. The same is true for string run identifiers.
if (
int(os.environ.get("TUNE_DISABLE_DATED_SUBDIR", 0)) == 1
or name
or isinstance(run, str)
):
self.dir_name = self.name
else:
self.dir_name = "{}_{}".format(self.name, date_str())
if not _experiment_checkpoint_dir:
self.dir_name = _get_dir_name(run, name, self.name)
assert self.dir_name
if sync_config.upload_dir:
self.remote_checkpoint_dir = os.path.join(
@ -207,9 +239,7 @@ class Experiment:
"config": config,
"resources_per_trial": resources_per_trial,
"num_samples": num_samples,
"local_dir": os.path.abspath(
os.path.expanduser(local_dir or DEFAULT_RESULTS_DIR)
),
"local_dir": local_dir,
"sync_config": sync_config,
"remote_checkpoint_dir": self.remote_checkpoint_dir,
"trial_name_creator": trial_name_creator,
@ -258,25 +288,21 @@ class Experiment:
return exp
@classmethod
def register_if_needed(cls, run_object):
"""Registers Trainable or Function at runtime.
def get_trainable_name(cls, run_object):
"""Get Trainable name.
Assumes already registered if run_object is a string.
Also, does not inspect interface of given run_object.
Arguments:
Args:
run_object (str|function|class): Trainable to run. If string,
assumes it is an ID and does not modify it. Otherwise,
returns a string corresponding to the run_object name.
Returns:
A string representing the trainable identifier.
"""
if isinstance(run_object, str):
return run_object
elif isinstance(run_object, Domain):
logger.warning("Not registering trainable. Resolving as variant.")
Raises:
TuneError: if ``run_object`` passed in is invalid.
"""
if isinstance(run_object, str) or isinstance(run_object, Domain):
return run_object
elif isinstance(run_object, type) or callable(run_object):
name = "DEFAULT"
@ -298,6 +324,31 @@ class Experiment:
name = run_object.func.__name__
else:
logger.warning("No name detected on trainable. Using {}.".format(name))
return name
else:
raise TuneError("Improper 'run' - not string nor trainable.")
@classmethod
def register_if_needed(cls, run_object):
"""Registers Trainable or Function at runtime.
Assumes already registered if run_object is a string.
Also, does not inspect interface of given run_object.
Args:
run_object (str|function|class): Trainable to run. If string,
assumes it is an ID and does not modify it. Otherwise,
returns a string corresponding to the run_object name.
Returns:
A string representing the trainable identifier.
"""
if isinstance(run_object, str):
return run_object
elif isinstance(run_object, Domain):
logger.warning("Not registering trainable. Resolving as variant.")
return run_object
name = cls.get_trainable_name(run_object)
try:
register_trainable(name, run_object)
except (TypeError, PicklingError) as e:
@ -310,8 +361,29 @@ class Experiment:
)
raise type(e)(str(e) + " " + extra_msg) from None
return name
else:
raise TuneError("Improper 'run' - not string nor trainable.")
@classmethod
def get_experiment_checkpoint_dir(cls, run_obj, local_dir=None, name=None):
"""Get experiment checkpoint dir without setting up an experiment.
This is only used internally for better support of Tuner API.
Args:
run_obj (str|function|class): Trainable to run.
name (str): The name of the experiment specified by user.
local_dir (str): The local_dir path.
Returns:
Checkpoint directory for experiment.
"""
assert run_obj
local_dir = _get_local_dir_with_expand_user(local_dir)
run_identifier = cls.get_trainable_name(run_obj)
combined_name = name or run_identifier
dir_name = _get_dir_name(run_obj, name, combined_name)
return os.path.join(local_dir, dir_name)
@property
def stopper(self):
@ -323,7 +395,10 @@ class Experiment:
@property
def checkpoint_dir(self):
if self.local_dir:
# Provided when initializing Experiment, if so, return directly.
if self._experiment_checkpoint_dir:
return self._experiment_checkpoint_dir
assert self.local_dir
return os.path.join(self.local_dir, self.dir_name)
@property

View file

View file

@ -0,0 +1,24 @@
from ray.data import Dataset
class DatasetExecutionRegistry:
"""A class that makes sure each dataset is executed only once.
Tune's driver process needs to make sure that dataset is fully executed
before sent to trials. Multiple trials can all use the same dataset.
In such case, the dataset should only be executed once (instead of once
per trial).
This class is only used when resuming trials from checkpoints."""
def __init__(self):
self.set = set()
def execute_if_needed(self, ds: Dataset) -> Dataset:
if ds._uuid not in self.set:
ds = ds.fully_executed()
self.set.add(ds._uuid)
return ds
dataset_execution_registry = DatasetExecutionRegistry()

View file

@ -0,0 +1,34 @@
import contextlib
import traceback
import ray
from ray.tune.impl.dataset_execution_registry import dataset_execution_registry
def _deserialize_and_fully_execute_if_needed(serialized_ds: bytes):
ds = ray.data.Dataset.deserialize_out_of_band(serialized_ds)
return dataset_execution_registry.execute_if_needed(ds)
def _reduce(ds: ray.data.Dataset):
tb_list = traceback.format_list(traceback.extract_stack())
_already_in_out_of_band_serialization = False
for tb in tb_list:
# TODO(xwjiang): Let's make this less hacky.
if "serialize_out_of_band" in tb:
_already_in_out_of_band_serialization = True
break
if not _already_in_out_of_band_serialization:
return _deserialize_and_fully_execute_if_needed, (ds.serialize_out_of_band(),)
else:
return ds.__reduce__()
@contextlib.contextmanager
def out_of_band_serialize_dataset():
context = ray.worker.global_worker.get_serialization_context()
try:
context._register_cloudpickle_reducer(ray.data.Dataset, _reduce)
yield
finally:
context._unregister_cloudpickle_reducer(ray.data.Dataset)

View file

@ -0,0 +1,65 @@
from sklearn.datasets import load_breast_cancer
from ray import tune
from ray.data import read_datasource, Dataset, Datasource, ReadTask
from ray.data.block import BlockMetadata
from ray.tune.impl.utils import execute_dataset
# TODO(xwjiang): Enable this when Clark's out-of-band-serialization is landed.
class TestDatasource(Datasource):
def prepare_read(self, parallelism: int, **read_args):
import pyarrow as pa
def load_data():
data_raw = load_breast_cancer(as_frame=True)
dataset_df = data_raw["data"]
dataset_df["target"] = data_raw["target"]
return [pa.Table.from_pandas(dataset_df)]
meta = BlockMetadata(
num_rows=None,
size_bytes=None,
schema=None,
input_files=None,
exec_stats=None,
)
return [ReadTask(load_data, meta)]
def gen_dataset_func() -> Dataset:
test_datasource = TestDatasource()
return read_datasource(test_datasource)
def test_grid_search():
ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
assert not ds1._plan._has_final_stage_snapshot()
assert not ds2._plan._has_final_stage_snapshot()
param_space = {"train_dataset": tune.grid_search([ds1, ds2])}
execute_dataset(param_space)
executed_ds = param_space["train_dataset"]["grid_search"]
assert len(executed_ds) == 2
assert executed_ds[0]._plan._has_final_stage_snapshot()
assert executed_ds[1]._plan._has_final_stage_snapshot()
def test_choice():
ds1 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
ds2 = gen_dataset_func()._experimental_lazy().map(lambda x: x)
assert not ds1._plan._has_final_stage_snapshot()
assert not ds2._plan._has_final_stage_snapshot()
param_space = {"train_dataset": tune.choice([ds1, ds2])}
execute_dataset(param_space)
executed_ds = param_space["train_dataset"].categories
assert len(executed_ds) == 2
assert executed_ds[0]._plan._has_final_stage_snapshot()
assert executed_ds[1]._plan._has_final_stage_snapshot()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", "-x", __file__]))

View file

@ -0,0 +1,192 @@
import copy
import os
from typing import Any, Callable, Dict, Optional, Type, Union
import ray.cloudpickle as pickle
from ray.ml.config import RunConfig
from ray.ml.trainer import Trainer
from ray.tune import Experiment, TuneError, ExperimentAnalysis
from ray.tune.impl.utils import execute_dataset
from ray.tune.result_grid import ResultGrid
from ray.tune.trainable import Trainable
from ray.tune.tune import run
from ray.tune.tune_config import TuneConfig
_TRAINABLE_PKL = "trainable.pkl"
_TUNER_PKL = "tuner.pkl"
_TRAINABLE_KEY = "_trainable"
_PARAM_SPACE_KEY = "_param_space"
class TunerInternal:
"""The real implementation behind external facing ``Tuner``.
The external facing ``Tuner`` multiplexes between local Tuner and remote Tuner
depending on whether in Ray client mode.
In Ray client mode, external ``Tuner`` wraps ``TunerInternal`` into a remote actor,
which is guaranteed to be placed on head node.
``TunerInternal`` can be constructed from fresh, in which case, ``trainable`` needs
to be provided, together with optional ``param_space``, ``tune_config`` and
``run_config``.
It can also be restored from a previous failed run (given ``restore_path``).
Args:
restore_path: The path from where the Tuner can be restored. If provided, None
of the rest args are needed.
trainable: The trainable to be tuned.
param_space: Search space of the tuning job.
One thing to note is that both preprocessor and dataset can be tuned here.
tune_config: Tuning algorithm specific configs.
Refer to ray.tune.tune_config.TuneConfig for more info.
run_config: Runtime configuration that is specific to individual trials.
Refer to ray.ml.config.RunConfig for more info.
"""
def __init__(
self,
restore_path: str = None,
trainable: Optional[
Union[
str,
Callable,
Type[Trainable],
Type[Trainer],
Trainer,
]
] = None,
param_space: Optional[Dict[str, Any]] = None,
tune_config: Optional[TuneConfig] = None,
run_config: Optional[RunConfig] = None,
):
# Restored from Tuner checkpoint.
if restore_path:
trainable_ckpt = os.path.join(restore_path, _TRAINABLE_PKL)
with open(trainable_ckpt, "rb") as fp:
trainable = pickle.load(fp)
tuner_ckpt = os.path.join(restore_path, _TUNER_PKL)
with open(tuner_ckpt, "rb") as fp:
tuner = pickle.load(fp)
self.__dict__.update(tuner.__dict__)
self._is_restored = True
self._trainable = trainable
self._experiment_checkpoint_dir = restore_path
return
# Start from fresh
if not trainable:
raise TuneError("You need to provide a trainable to tune.")
self._is_restored = False
self._trainable = trainable
self._tune_config = tune_config
self._run_config = run_config
self._experiment_checkpoint_dir = self._setup_create_experiment_checkpoint_dir(
self._run_config
)
# Not used for restored Tuner.
self._param_space = param_space
self._process_dataset_param()
# This needs to happen before `tune.run()` is kicked in.
# This is because currently tune does not exit gracefully if
# run in ray client mode - if crash happens, it just exits immediately
# without allowing for checkpointing tuner and trainable.
# Thus this has to happen before tune.run() so that we can have something
# to restore from.
tuner_ckpt = os.path.join(self._experiment_checkpoint_dir, _TUNER_PKL)
with open(tuner_ckpt, "wb") as fp:
pickle.dump(self, fp)
trainable_ckpt = os.path.join(self._experiment_checkpoint_dir, _TRAINABLE_PKL)
with open(trainable_ckpt, "wb") as fp:
pickle.dump(self._trainable, fp)
def _process_dataset_param(self) -> None:
"""Dataset needs to be fully executed before sent over to trainables.
A valid dataset configuration in param space looks like:
"datasets": {
"train_dataset": tune.grid_search([ds1, ds2]),
},
"""
execute_dataset(self._param_space)
def _setup_create_experiment_checkpoint_dir(
self, run_config: Optional[RunConfig]
) -> str:
"""Sets up experiment checkpoint dir before actually running the experiment."""
path = Experiment.get_experiment_checkpoint_dir(
self._convert_trainable(self._trainable),
run_config.local_dir,
run_config.name,
)
if not os.path.exists(path):
os.makedirs(path)
return path
# This has to be done through a function signature (@property won't do).
def experiment_checkpoint_dir(self) -> str:
return self._experiment_checkpoint_dir
@staticmethod
def _convert_trainable(trainable: Any) -> Type[Trainable]:
if isinstance(trainable, Trainer):
trainable = trainable.as_trainable()
else:
trainable = trainable
return trainable
def fit(self) -> ResultGrid:
trainable = self._convert_trainable(self._trainable)
assert self._experiment_checkpoint_dir
if not self._is_restored:
param_space = copy.deepcopy(self._param_space)
analysis = self._fit_internal(trainable, param_space)
else:
analysis = self._fit_resume(trainable)
analysis._legacy_checkpoint = False
return ResultGrid(analysis)
def _fit_internal(self, trainable, param_space) -> ExperimentAnalysis:
"""Fitting for a fresh Tuner."""
analysis = run(
trainable,
config={**param_space},
mode=self._tune_config.mode,
metric=self._tune_config.metric,
num_samples=self._tune_config.num_samples,
search_alg=self._tune_config.search_alg,
scheduler=self._tune_config.scheduler,
name=self._run_config.name,
callbacks=self._run_config.callbacks,
_experiment_checkpoint_dir=self._experiment_checkpoint_dir,
)
return analysis
def _fit_resume(self, trainable) -> ExperimentAnalysis:
"""Fitting for a restored Tuner."""
analysis = run(
trainable,
resume=True,
mode=self._tune_config.mode,
metric=self._tune_config.metric,
callbacks=self._run_config.callbacks,
_experiment_checkpoint_dir=self._experiment_checkpoint_dir,
)
return analysis
def __getstate__(self):
state = self.__dict__.copy()
state.pop(_TRAINABLE_KEY, None)
state.pop(_PARAM_SPACE_KEY, None)
return state
def __setstate__(self, state):
self.__dict__.update(state)

View file

@ -0,0 +1,28 @@
from ray.data import Dataset
from ray.tune.sample import Categorical
def execute_dataset(config_dict: dict):
"""Going through config dict (params space) and fully execute any Dataset
if necessary.
"""
for k, v in config_dict.items():
if isinstance(v, dict):
execute_dataset(v)
elif isinstance(v, Dataset):
config_dict[k] = v.fully_executed()
# TODO(xwjiang): Consider CV config for beta.
# elif isinstance(v, int):
# # CV settings
# pass
elif isinstance(v, list) or isinstance(v, Categorical):
_list = v if isinstance(v, list) else v.categories
if len(_list) == 0:
return
if isinstance(_list[0], Dataset):
if isinstance(v, list):
config_dict[k] = [_item.fully_executed() for _item in _list]
else: # Categorical
config_dict[k].categories = [
_item.fully_executed() for _item in _list
]

View file

@ -62,7 +62,6 @@ class ResultGrid:
return None
def _trial_to_result(self, trial: Trial) -> Result:
# TODO(xwjiang): Use Kai's new checkpoint!
result = Result(
checkpoint=trial.checkpoint,
metrics=trial.last_result,

View file

@ -10,6 +10,7 @@ import traceback
import warnings
import ray
from ray.tune.impl.out_of_band_serialize_dataset import out_of_band_serialize_dataset
from ray.util import get_node_ip_address
from ray.tune import TuneError
from ray.tune.callback import CallbackList
@ -160,6 +161,7 @@ class _ExperimentCheckpointManager:
search_alg.save_to_dir(self._checkpoint_dir, session_str=self._session_str)
checkpoint_time_start = time.monotonic()
with out_of_band_serialize_dataset():
_serialize_and_write()
if self._sync_trial_checkpoints:

View file

@ -121,6 +121,8 @@ def run(
raise_on_failed_trial: bool = True,
callbacks: Optional[Sequence[Callback]] = None,
max_concurrent_trials: Optional[int] = None,
# == internal only ==
_experiment_checkpoint_dir: Optional[str] = None,
# Deprecated args
queue_trials: Optional[bool] = None,
loggers: Optional[Sequence[Type[Logger]]] = None,
@ -474,6 +476,7 @@ def run(
resources_per_trial=resources_per_trial,
num_samples=num_samples,
local_dir=local_dir,
_experiment_checkpoint_dir=_experiment_checkpoint_dir,
sync_config=sync_config,
trial_name_creator=trial_name_creator,
trial_dirname_creator=trial_dirname_creator,

View file

@ -1,12 +1,29 @@
from typing import Any, Callable, Dict, Optional, Type, Union
import ray
from ray.ml.config import RunConfig
from ray.ml.trainer import Trainer
from ray.tune import TuneError
from ray.tune.result_grid import ResultGrid
from ray.tune.trainable import Trainable
from ray.tune.impl.tuner_internal import TunerInternal
from ray.tune.tune_config import TuneConfig
from ray.util import PublicAPI
from ray.util.client.common import ClientActorHandle
from ray.util.ml_utils.node import force_on_current_node
# The magic key that is used when instantiating Tuner during resume.
_TUNER_INTERNAL = "_tuner_internal"
_SELF = "self"
@PublicAPI(stability="alpha")
class Tuner:
"""Tuner is the recommended way of launching hyperparameter tuning jobs with Ray Tune.
Attributes:
Args:
trainable: The trainable to be tuned.
param_space: Search space of the tuning job.
One thing to note is that both preprocessor and dataset can be tuned here.
@ -15,9 +32,6 @@ class Tuner:
run_config: Runtime configuration that is specific to individual trials.
Refer to ray.ml.config.RunConfig for more info.
Returns:
``ResultGrid`` object.
Usage pattern:
.. code-block:: python
@ -52,10 +66,49 @@ class Tuner:
tuner = Tuner.restore(experiment_checkpoint_dir)
tuner.fit()
`experiment_checkpoint_dir` can be easily located near the end of the
``experiment_checkpoint_dir`` can be easily located near the end of the
console output of your first failed run.
"""
# One of the following is assigned.
_local_tuner: Optional[TunerInternal] # Only used in none ray client mode.
_remote_tuner: Optional[ClientActorHandle] # Only used in ray client mode.
def __init__(
self,
trainable: Optional[
Union[
str,
Callable,
Type[Trainable],
Type[Trainer],
Trainer,
]
] = None,
param_space: Optional[Dict[str, Any]] = None,
tune_config: Optional[TuneConfig] = None,
run_config: Optional[RunConfig] = None,
# This is internal only arg.
_tuner_internal: Optional[TunerInternal] = None,
):
"""Configure and construct a tune run."""
kwargs = locals().copy()
self._is_ray_client = ray.util.client.ray.is_connected()
if _tuner_internal:
if not self._is_ray_client:
self._local_tuner = kwargs[_TUNER_INTERNAL]
else:
self._remote_tuner = kwargs[_TUNER_INTERNAL]
else:
kwargs.pop(_TUNER_INTERNAL, None)
kwargs.pop(_SELF, None)
if not self._is_ray_client:
self._local_tuner = TunerInternal(**kwargs)
else:
self._remote_tuner = force_on_current_node(
ray.remote(num_cpus=0)(TunerInternal)
).remote(**kwargs)
@classmethod
def restore(cls, path: str) -> "Tuner":
"""Restores Tuner after a previously failed run.
@ -71,7 +124,14 @@ class Tuner:
# retored runs.
# For example, is callbacks supposed to be automatically applied
# when a Tuner is restored and fit again?
raise NotImplementedError
if not ray.util.client.ray.is_connected():
tuner_internal = TunerInternal(restore_path=path)
return Tuner(tuner_internal=tuner_internal)
else:
tuner_internal = force_on_current_node(
ray.remote(num_cpus=0)(TunerInternal)
).remote(restore_path=path)
return Tuner(tuner_internal=tuner_internal)
def fit(self) -> ResultGrid:
"""Executes hyperparameter tuning job as configured and returns result.
@ -80,7 +140,7 @@ class Tuner:
For the kind of exception that happens during the execution of a trial,
one may inspect it together with stacktrace through the returned result grid.
See ``ResultGrid`` for reference. Each trial may fail up to a certain number.
This is configured by `RunConfig.FailureConfig.max_failures`.
This is configured by ``RunConfig.FailureConfig.max_failures``.
Exception that happens beyond trials will be thrown by this method as well.
In such cases, there will be instruction like the following printed out
@ -96,4 +156,25 @@ class Tuner:
TuneError: If errors occur executing the experiment that originate from
Tune.
"""
raise NotImplementedError
if not self._is_ray_client:
try:
return self._local_tuner.fit()
except Exception as e:
raise TuneError(
f"Tune run failed."
f'Please use tuner = Tuner.restore("'
f'{self._local_tuner.experiment_checkpoint_dir}") to resume.'
) from e
else:
experiment_checkpoint_dir = ray.get(
self._remote_tuner.experiment_checkpoint_dir.remote()
)
try:
return ray.get(self._remote_tuner.fit.remote())
except Exception as e:
raise TuneError(
f"Tune run failed."
f'Please use tuner = Tuner.restore("'
f'{experiment_checkpoint_dir}") to resume.'
) from e