diff --git a/python/ray/air/tests/test_resource_changing.py b/python/ray/air/tests/test_resource_changing.py index 2299c5dab..e5e6b9cc4 100644 --- a/python/ray/air/tests/test_resource_changing.py +++ b/python/ray/air/tests/test_resource_changing.py @@ -63,9 +63,9 @@ class AssertingXGBoostTrainer(XGBoostTrainer): @property def _ray_params(self): scaling_config = self._validate_scaling_config(self.scaling_config) - assert ( - scaling_config.as_placement_group_factory() == session.get_trial_resources() - ) + pgf = scaling_config.as_placement_group_factory() + tr = session.get_trial_resources() + assert pgf == tr, (scaling_config, pgf, tr) return super()._ray_params diff --git a/python/ray/train/gbdt_trainer.py b/python/ray/train/gbdt_trainer.py index da6d4f5c9..c40459113 100644 --- a/python/ray/train/gbdt_trainer.py +++ b/python/ray/train/gbdt_trainer.py @@ -26,18 +26,41 @@ def _convert_scaling_config_to_ray_params( ray_params_cls: Type["xgboost_ray.RayParams"], default_ray_params: Optional[Dict[str, Any]] = None, ) -> "xgboost_ray.RayParams": - default_ray_params = default_ray_params or {} - resources_per_worker = scaling_config.additional_resources_per_worker - num_workers = scaling_config.num_workers - cpus_per_worker = scaling_config.num_cpus_per_worker - gpus_per_worker = scaling_config.num_gpus_per_worker + """Scaling config parameters have precedence over default ray params. + Default ray params are defined in the trainers (xgboost/lightgbm), + but if the user requests something else, that should be respected. + """ + resources = (scaling_config.resources_per_worker or {}).copy() + + cpus_per_actor = resources.pop("CPU", 0) + if not cpus_per_actor: + cpus_per_actor = default_ray_params.get("cpus_per_actor", 0) + + gpus_per_actor = resources.pop("GPU", int(scaling_config.use_gpu)) + if not gpus_per_actor: + gpus_per_actor = default_ray_params.get("gpus_per_actor", 0) + + resources_per_actor = resources + if not resources_per_actor: + resources_per_actor = default_ray_params.get("resources_per_actor", None) + + num_actors = scaling_config.num_workers + if not num_actors: + num_actors = default_ray_params.get("num_actors", 0) + + ray_params_kwargs = default_ray_params.copy() or {} + + ray_params_kwargs.update( + { + "cpus_per_actor": int(cpus_per_actor), + "gpus_per_actor": int(gpus_per_actor), + "resources_per_actor": resources_per_actor, + "num_actors": int(num_actors), + } + ) ray_params = ray_params_cls( - num_actors=int(num_workers), - cpus_per_actor=int(cpus_per_worker), - gpus_per_actor=int(gpus_per_worker), - resources_per_actor=resources_per_worker, - **default_ray_params, + **ray_params_kwargs, ) return ray_params diff --git a/python/ray/train/lightgbm/lightgbm_trainer.py b/python/ray/train/lightgbm/lightgbm_trainer.py index 5acf5bbcd..30e4f4c69 100644 --- a/python/ray/train/lightgbm/lightgbm_trainer.py +++ b/python/ray/train/lightgbm/lightgbm_trainer.py @@ -75,6 +75,9 @@ class LightGBMTrainer(GBDTTrainer): _default_ray_params: Dict[str, Any] = { "checkpoint_frequency": 1, "allow_less_than_two_cpus": True, + "num_actors": 1, + "cpus_per_actor": 2, + "gpus_per_actor": 0, } _init_model_arg_name: str = "init_model" diff --git a/python/ray/train/tests/test_lightgbm_trainer.py b/python/ray/train/tests/test_lightgbm_trainer.py index a7b9fdcdb..2affbb0ea 100644 --- a/python/ray/train/tests/test_lightgbm_trainer.py +++ b/python/ray/train/tests/test_lightgbm_trainer.py @@ -18,8 +18,8 @@ from sklearn.model_selection import train_test_split @pytest.fixture -def ray_start_4_cpus(): - address_info = ray.init(num_cpus=4) +def ray_start_6_cpus(): + address_info = ray.init(num_cpus=6) yield address_info # The code after the yield will run as teardown code. ray.shutdown() @@ -42,7 +42,7 @@ def get_num_trees(booster: lgbm.Booster) -> int: return booster.current_iteration() -def test_fit_with_categoricals(ray_start_4_cpus): +def test_fit_with_categoricals(ray_start_6_cpus): train_df_with_cat = train_df.copy() test_df_with_cat = test_df.copy() train_df_with_cat["categorical_column"] = pd.Series( @@ -66,7 +66,7 @@ def test_fit_with_categoricals(ray_start_4_cpus): assert model.pandas_categorical == [["A", "B"]] -def test_resume_from_checkpoint(ray_start_4_cpus, tmpdir): +def test_resume_from_checkpoint(ray_start_6_cpus, tmpdir): train_dataset = ray.data.from_pandas(train_df) valid_dataset = ray.data.from_pandas(test_df) trainer = LightGBMTrainer( @@ -112,7 +112,7 @@ def test_resume_from_checkpoint(ray_start_4_cpus, tmpdir): (0, False, 0), ], ) -def test_checkpoint_freq(ray_start_4_cpus, freq_end_expected): +def test_checkpoint_freq(ray_start_6_cpus, freq_end_expected): freq, end, expected = freq_end_expected train_dataset = ray.data.from_pandas(train_df) @@ -144,7 +144,7 @@ def test_checkpoint_freq(ray_start_4_cpus, freq_end_expected): assert cp_paths == sorted(cp_paths), str(cp_paths) -def test_preprocessor_in_checkpoint(ray_start_4_cpus, tmpdir): +def test_preprocessor_in_checkpoint(ray_start_6_cpus, tmpdir): train_dataset = ray.data.from_pandas(train_df) valid_dataset = ray.data.from_pandas(test_df) @@ -183,7 +183,7 @@ def test_preprocessor_in_checkpoint(ray_start_4_cpus, tmpdir): assert preprocessor.fitted_ -def test_tune(ray_start_4_cpus): +def test_tune(ray_start_6_cpus): train_dataset = ray.data.from_pandas(train_df) valid_dataset = ray.data.from_pandas(test_df) trainer = LightGBMTrainer( @@ -203,7 +203,7 @@ def test_tune(ray_start_4_cpus): assert trainer.params["max_depth"] == 1 -def test_validation(ray_start_4_cpus): +def test_validation(ray_start_6_cpus): train_dataset = ray.data.from_pandas(train_df) valid_dataset = ray.data.from_pandas(test_df) with pytest.raises(KeyError, match=TRAIN_DATASET_KEY): @@ -223,6 +223,25 @@ def test_validation(ray_start_4_cpus): ) +def test_default_parameters_default(): + trainer = LightGBMTrainer( + datasets={TRAIN_DATASET_KEY: ray.data.from_pandas(train_df)}, + label_column="target", + params=params, + ) + assert trainer._ray_params.cpus_per_actor == 2 + + +def test_default_parameters_scaling_config(): + trainer = LightGBMTrainer( + datasets={TRAIN_DATASET_KEY: ray.data.from_pandas(train_df)}, + label_column="target", + params=params, + scaling_config=ScalingConfig(resources_per_worker={"CPU": 4}), + ) + assert trainer._ray_params.cpus_per_actor == 4 + + if __name__ == "__main__": import pytest import sys diff --git a/python/ray/train/xgboost/xgboost_trainer.py b/python/ray/train/xgboost/xgboost_trainer.py index f7fe79937..1fe14e001 100644 --- a/python/ray/train/xgboost/xgboost_trainer.py +++ b/python/ray/train/xgboost/xgboost_trainer.py @@ -1,4 +1,4 @@ -from typing import Optional, Tuple, TYPE_CHECKING +from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING from ray.air.checkpoint import Checkpoint from ray.train.gbdt_trainer import GBDTTrainer @@ -66,6 +66,11 @@ class XGBoostTrainer(GBDTTrainer): _ray_params_cls: type = xgboost_ray.RayParams _tune_callback_report_cls: type = TuneReportCallback _tune_callback_checkpoint_cls: type = TuneReportCheckpointCallback + _default_ray_params: Dict[str, Any] = { + "num_actors": 1, + "cpus_per_actor": 1, + "gpus_per_actor": 0, + } _init_model_arg_name: str = "xgb_model" def _train(self, **kwargs):