diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index 03e555279..74b14e725 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -318,6 +318,13 @@ - DATA_PROCESSING_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/workflow/... python/ray/data/... +- label: ":slot_machine: ML Utils tests" + conditions: ["RAY_CI_ML_UTILS_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT + - TUNE_TESTING=1 ./ci/travis/install-dependencies.sh + - bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/ml_utils/... + - label: ":book: Doc tests and examples" conditions: ["RAY_CI_PYTHON_AFFECTED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_DOC_AFFECTED"] diff --git a/ci/travis/determine_tests_to_run.py b/ci/travis/determine_tests_to_run.py index ac2725118..37d752937 100644 --- a/ci/travis/determine_tests_to_run.py +++ b/ci/travis/determine_tests_to_run.py @@ -148,6 +148,7 @@ if __name__ == "__main__": RAY_CI_SGD_AFFECTED = 1 RAY_CI_TUNE_AFFECTED = 1 RAY_CI_RLLIB_AFFECTED = 1 + RAY_CI_ML_UTILS_AFFECTED = 1 elif re.match("^(python/ray/)?rllib/", changed_file): RAY_CI_RLLIB_AFFECTED = 1 RAY_CI_RLLIB_DIRECTLY_AFFECTED = 1 diff --git a/doc/examples/datasets_train/datasets_train.py b/doc/examples/datasets_train/datasets_train.py index d38b2b7ff..bc4625432 100644 --- a/doc/examples/datasets_train/datasets_train.py +++ b/doc/examples/datasets_train/datasets_train.py @@ -25,36 +25,11 @@ import torch.optim as optim from ray import train from ray.data.aggregate import Mean, Std from ray.train import Trainer -from ray.train import TrainingCallback +from ray.train.callbacks.logging import MLflowLoggerCallback from ray.train.callbacks import TBXLoggerCallback from torch.nn.parallel import DistributedDataParallel -# TODO(amogkam): Upstream this into Ray Train. -class MLflowCallback(TrainingCallback): - def __init__(self, config): - self.config = config - - def handle_result(self, results, **info): - # For each result that's being reported by ``train.report()``, - # we get the result from the rank 0 worker (i.e. first worker) and - # report it to MLflow. - rank_zero_results = results[0] - mlflow.log_metrics(rank_zero_results) - - # TODO: fix type hint for logdir - def start_training(self, logdir, **info): - mlflow.start_run(run_name=str(logdir.name)) - mlflow.log_params(config) - - # TODO: Update TrainCallback to provide logdir in finish_training. - self.logdir = logdir - - def finish_training(self, error: bool = False, **info): - # Save the Trainer checkpoints as artifacts to mlflow. - mlflow.log_artifacts(self.logdir) - - def make_and_upload_dataset(dir_path): import random @@ -641,7 +616,11 @@ if __name__ == "__main__": # and should also create 1 directory per file. tbx_logdir = "./runs" os.makedirs(tbx_logdir, exist_ok=True) - callbacks = [TBXLoggerCallback(logdir=tbx_logdir), MLflowCallback(config)] + callbacks = [ + TBXLoggerCallback(logdir=tbx_logdir), + MLflowLoggerCallback( + experiment_name="cuj-big-data-training", save_artifact=True) + ] # Remove CPU resource so Datasets can be scheduled. resources_per_worker = {"CPU": 0, "GPU": 1} if use_gpu else None diff --git a/doc/source/train/api.rst b/doc/source/train/api.rst index fbe1da0b7..cdcab783d 100644 --- a/doc/source/train/api.rst +++ b/doc/source/train/api.rst @@ -72,6 +72,13 @@ TBXLoggerCallback .. autoclass:: ray.train.callbacks.TBXLoggerCallback +.. _train-api-mlflow-logger-callback: + +MLflowLoggerCallback +~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: ray.train.callbacks.MLflowLoggerCallback + Checkpointing ------------- diff --git a/doc/source/train/examples.rst b/doc/source/train/examples.rst index 0693979a6..54eb11410 100644 --- a/doc/source/train/examples.rst +++ b/doc/source/train/examples.rst @@ -47,11 +47,10 @@ Horovod Simple example for Horovod (with TensorFlow) -Iterator API Examples ---------------------- - +Logger/Callback Examples +------------------------ * :doc:`/train/examples/mlflow_fashion_mnist_example`: - Example for using the Iterator API for custom MLFlow integration. + Example for logging training to MLflow via the ``MLflowLoggerCallback`` Ray Datasets Integration Examples --------------------------------- diff --git a/doc/source/train/user_guide.rst b/doc/source/train/user_guide.rst index 633a6993a..bada650f9 100644 --- a/doc/source/train/user_guide.rst +++ b/doc/source/train/user_guide.rst @@ -317,7 +317,7 @@ Log directories are exposed through the following attributes: Logs will be written by: -1. :ref:`Logging Callbacks ` +1. :ref:`Callbacks ` 2. :ref:`Checkpoints ` .. TODO link to Training Run Iterator API as a 3rd option for logging. @@ -327,6 +327,11 @@ Logs will be written by: Logging, Monitoring, and Callbacks ---------------------------------- +Ray Train has mechanisms to easily collect intermediate results from the training workers during the training run +and also has a :ref:`Callback interface ` to perform actions on these intermediate results (such as logging, aggregations, printing, etc.). +You can use either the :ref:`built-in callbacks ` that Ray Train provides, +or implement a :ref:`custom callback ` for your use case. + Reporting intermediate results ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -393,9 +398,64 @@ Callbacks ~~~~~~~~~ You may want to plug in your training code with your favorite experiment management framework. -Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results. +Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results +(the values passed into ``train.report(...)``). -You can plug all of these into Ray Train with the following interface: +Ray Train contains built-in callbacks for popular tracking frameworks, or you can implement your own callback via the ``TrainCallback`` interface. + +.. _train-builtin-callbacks: + +Built-in Callbacks +++++++++++++++++++ + +The following ``TrainingCallback``\s are available and will log the intermediate results of the training run. + +1. :ref:`train-api-json-logger-callback` +2. :ref:`train-api-tbx-logger-callback` +3. :ref:`train-api-mlflow-logger-callback` + +Example: Logging to MLflow and Tensorboard +++++++++++++++++++++++++++++++++++++++++++ + +**Step 1: Install the necessary packages** + +.. code-block:: bash + + $ pip install mlflow + $ pip install tensorboardX + +**Step 2: Run the following training script** + +.. literalinclude:: /../../python/ray/train/examples/mlflow_simple_example.py + :language: python + +**Step 3: Visualize the logs** + +.. code-block:: bash + + # Navigate to the run directory of the trainer. + # For example `cd /home/ray_results/train_2021-09-01_12-00-00/run_001` + $ cd + + # View the MLflow UI. + $ mlflow ui + + # View the tensorboard UI. + $ tensorboard --logdir . + + +.. _train-custom-callbacks: + +Custom Callbacks +++++++++++++++++ + +If the provided callbacks do not cover your desired integrations or use-cases, +you may always implement a custom callback by subclassing ``TrainingCallback``. If +the callback is general enough, please feel welcome to `add it `_ to the ``ray`` +`repository `_. + +A simple example for creating a callback that will print out results: .. code-block:: python @@ -422,44 +482,6 @@ You can plug all of these into Ray Train with the following interface: # [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}] trainer.shutdown() -.. Here is a list of callbacks that are supported by Ray Train: - -.. * JsonLoggerCallback -.. * TBXLoggerCallback -.. * WandbCallback -.. * MlflowCallback -.. * CSVCallback - -.. _train-logging-callbacks: - -Logging Callbacks -+++++++++++++++++ - -The following ``TrainingCallback``\s are available and will write to a file within the -:ref:`log directory ` of each training run. - -1. :ref:`train-api-json-logger-callback` -2. :ref:`train-api-tbx-logger-callback` - -Custom Callbacks -++++++++++++++++ - -If the provided callbacks do not cover your desired integrations or use-cases, -you may always implement a custom callback by subclassing ``TrainingCallback``. If -the callback is general enough, please feel welcome to `add it `_ to the ``ray`` -`repository `_. - -A simple example for creating a callback that will print out results: - -.. code-block:: python - - from ray.train import TrainingCallback - - class PrintingCallback(TrainingCallback): - def handle_result(self, results: List[Dict], **info): - print(results) - .. Advanced Customization diff --git a/python/ray/train/BUILD b/python/ray/train/BUILD index 617edd4ec..0733a39fc 100644 --- a/python/ray/train/BUILD +++ b/python/ray/train/BUILD @@ -12,6 +12,15 @@ py_test( args = ["--smoke-test"] ) +py_test( + name = "mlflow_simple_example", + size = "medium", + main = "examples/mlflow_simple_example.py", + srcs = ["examples/mlflow_simple_example.py"], + tags = ["team:ml", "exclusive"], + deps = [":train_lib"], +) + py_test( name = "tensorflow_quick_start", size = "medium", diff --git a/python/ray/train/callbacks/__init__.py b/python/ray/train/callbacks/__init__.py index bd90eaa7f..adea4d107 100644 --- a/python/ray/train/callbacks/__init__.py +++ b/python/ray/train/callbacks/__init__.py @@ -1,4 +1,8 @@ from ray.train.callbacks.callback import TrainingCallback -from ray.train.callbacks.logging import (JsonLoggerCallback, TBXLoggerCallback) +from ray.train.callbacks.logging import ( + JsonLoggerCallback, MLflowLoggerCallback, TBXLoggerCallback) -__all__ = ["TrainingCallback", "JsonLoggerCallback", "TBXLoggerCallback"] +__all__ = [ + "TrainingCallback", "JsonLoggerCallback", "MLflowLoggerCallback", + "TBXLoggerCallback" +] diff --git a/python/ray/train/callbacks/callback.py b/python/ray/train/callbacks/callback.py index 0cc28d736..e889c7862 100644 --- a/python/ray/train/callbacks/callback.py +++ b/python/ray/train/callbacks/callback.py @@ -16,12 +16,13 @@ class TrainingCallback(metaclass=abc.ABCMeta): """ pass - def start_training(self, logdir: str, **info): + def start_training(self, logdir: str, config: Dict, **info): """Called once on training start. Args: logdir (str): Path to the file directory where logs should be persisted. + config (Dict): The config dict passed into ``trainer.run()``. **info: kwargs dict for forward compatibility. """ pass diff --git a/python/ray/train/callbacks/logging.py b/python/ray/train/callbacks/logging.py index 41ead4db3..5c9f8b53e 100644 --- a/python/ray/train/callbacks/logging.py +++ b/python/ray/train/callbacks/logging.py @@ -1,3 +1,4 @@ +import os from typing import Iterable, List, Optional, Dict, Set, Tuple, Union import abc import warnings @@ -11,13 +12,15 @@ from ray.util.ml_utils.dict import flatten_dict from ray.util.ml_utils.json import SafeFallbackEncoder from ray.train.callbacks import TrainingCallback from ray.train.constants import (RESULT_FILE_JSON, TRAINING_ITERATION, - TIME_TOTAL_S, TIMESTAMP, PID) + TIME_TOTAL_S, TIMESTAMP, PID, + TRAIN_CHECKPOINT_SUBDIR) +from ray.util.ml_utils.mlflow import MLflowLoggerUtil logger = logging.getLogger(__name__) class TrainingLogdirMixin: - def start_training(self, logdir: str, **info): + def start_training(self, logdir: str): if self._logdir: logdir_path = Path(self._logdir) else: @@ -83,8 +86,8 @@ class TrainingSingleFileLoggingCallback( raise ValueError("filename cannot be None or empty.") return logdir_path.joinpath(Path(filename)) - def start_training(self, logdir: str, **info): - super().start_training(logdir, **info) + def start_training(self, logdir: str): + super().start_training(logdir) if not self._filename: filename = self._default_filename @@ -117,7 +120,7 @@ class JsonLoggerCallback(TrainingSingleFileLoggingCallback): _default_filename: Union[str, Path] = RESULT_FILE_JSON def start_training(self, logdir: str, **info): - super().start_training(logdir, **info) + super().start_training(logdir) # Create a JSON file with an empty list # that will be latter appended to @@ -174,6 +177,96 @@ class TrainingSingleWorkerLoggingCallback( return worker_to_log +class MLflowLoggerCallback(TrainingSingleWorkerLoggingCallback): + """MLflow Logger to automatically log Train results and config to MLflow. + + MLflow (https://mlflow.org) Tracking is an open source library for + recording and querying experiments. This Ray Train callback + sends information (config parameters, training results & metrics, + and artifacts) to MLflow for automatic experiment tracking. + + Args: + tracking_uri (Optional[str]): The tracking URI for where to manage + experiments and runs. This can either be a local file path or a + remote server. If None is passed in, the logdir of the trainer + will be used as the tracking URI. + This arg gets passed directly to mlflow initialization. + registry_uri (Optional[str]): The registry URI that gets passed + directly to mlflow initialization. If None is passed in, the + logdir of the trainer will be used as the registry URI. + experiment_id (Optional[str]): The experiment id of an already + existing experiment. If not + passed in, experiment_name will be used. + experiment_name (Optional[str]): The experiment name to use for this + Train run. + If the experiment with the name already exists with MLflow, + it will be used. If not, a new experiment will be created with + this name. At least one of ``experiment_id`` or + ``experiment_name`` must be passed in. + tags (Optional[Dict]): An optional dictionary of string keys and + values to set as tags on the run + save_artifact (bool): If set to True, automatically save the entire + contents of the Train local_dir as an artifact to the + corresponding run in MlFlow. + logdir (Optional[str]): Path to directory where the results file + should be. If None, will be set by the Trainer. If no tracking + uri or registry uri are passed in, the logdir will be used for + both. + worker_to_log (int): Worker index to log. By default, will log the + worker with index 0. + """ + + def __init__(self, + tracking_uri: Optional[str] = None, + registry_uri: Optional[str] = None, + experiment_id: Optional[str] = None, + experiment_name: Optional[str] = None, + tags: Optional[Dict] = None, + save_artifact: bool = False, + logdir: Optional[str] = None, + worker_to_log: int = 0): + super().__init__(logdir=logdir, worker_to_log=worker_to_log) + + self.tracking_uri = tracking_uri + self.registry_uri = registry_uri + self.experiment_id = experiment_id + self.experiment_name = experiment_name + self.tags = tags + + self.save_artifact = save_artifact + self.mlflow_util = MLflowLoggerUtil() + + def start_training(self, logdir: str, config: Dict, **info): + super().start_training(logdir=logdir) + + tracking_uri = self.tracking_uri or os.path.join( + str(self.logdir), "mlruns") + registry_uri = self.registry_uri or os.path.join( + str(self.logdir), "mlruns") + + self.mlflow_util.setup_mlflow( + tracking_uri=tracking_uri, + registry_uri=registry_uri, + experiment_id=self.experiment_id, + experiment_name=self.experiment_name, + create_experiment_if_not_exists=True) + + self.mlflow_util.start_run(tags=self.tags, set_active=True) + self.mlflow_util.log_params(params_to_log=config) + + def handle_result(self, results: List[Dict], **info): + result = results[self._workers_to_log] + + self.mlflow_util.log_metrics( + metrics_to_log=result, step=result[TRAINING_ITERATION]) + + def finish_training(self, error: bool = False, **info): + checkpoint_dir = self.logdir.joinpath(TRAIN_CHECKPOINT_SUBDIR) + if self.save_artifact and checkpoint_dir.exists(): + self.mlflow_util.save_artifacts(dir=str(checkpoint_dir)) + self.mlflow_util.end_run(status="FAILED" if error else "FINISHED") + + class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback): """Logs Train results in TensorboardX format. @@ -189,7 +282,7 @@ class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback): IGNORE_KEYS: Set[str] = {PID, TIMESTAMP, TIME_TOTAL_S, TRAINING_ITERATION} def start_training(self, logdir: str, **info): - super().start_training(logdir, **info) + super().start_training(logdir) try: from tensorboardX import SummaryWriter diff --git a/python/ray/train/checkpoint.py b/python/ray/train/checkpoint.py index 9dbba5826..86efcab14 100644 --- a/python/ray/train/checkpoint.py +++ b/python/ray/train/checkpoint.py @@ -7,7 +7,8 @@ from pathlib import Path from typing import List, Optional, Dict, Union, Callable from ray import cloudpickle -from ray.train.constants import TIMESTAMP, TUNE_INSTALLED +from ray.train.constants import TIMESTAMP, TUNE_INSTALLED, \ + TRAIN_CHECKPOINT_SUBDIR from ray.train.constants import TUNE_CHECKPOINT_FILE_NAME, \ TUNE_CHECKPOINT_ID from ray.train.session import TrainingResult @@ -256,7 +257,7 @@ class CheckpointManager: @property def latest_checkpoint_dir(self) -> Optional[Path]: """Path to the latest checkpoint directory.""" - checkpoint_dir = Path("checkpoints") + checkpoint_dir = Path(TRAIN_CHECKPOINT_SUBDIR) return construct_path(checkpoint_dir, self.run_dir) @property diff --git a/python/ray/train/constants.py b/python/ray/train/constants.py index 31fb787c6..4ce55c7ca 100644 --- a/python/ray/train/constants.py +++ b/python/ray/train/constants.py @@ -33,9 +33,12 @@ RESULT_FILE_JSON = "results.json" # Default directory where all Train logs, checkpoints, etc. will be stored. DEFAULT_RESULTS_DIR = Path("~/ray_results").expanduser() -# File name to use for checkpoints saved with Tune +# File name to use for checkpoints saved with Tune. TUNE_CHECKPOINT_FILE_NAME = "checkpoint" +# The name of the subdirectory inside the trainer run_dir to store checkpoints. +TRAIN_CHECKPOINT_SUBDIR = "checkpoints" + # The key to use to specify the checkpoint id for Tune. # This needs to be added to the checkpoint dictionary so if the Tune trial # is restarted, the checkpoint_id can continue to increment. diff --git a/python/ray/train/examples/mlflow_fashion_mnist_example.py b/python/ray/train/examples/mlflow_fashion_mnist_example.py index 5205b5e94..ed92acde2 100644 --- a/python/ray/train/examples/mlflow_fashion_mnist_example.py +++ b/python/ray/train/examples/mlflow_fashion_mnist_example.py @@ -1,30 +1,26 @@ import argparse -import mlflow - from ray.train import Trainer from ray.train.examples.train_fashion_mnist_example import train_func +from ray.train.callbacks.logging import MLflowLoggerCallback def main(num_workers=2, use_gpu=False): - mlflow.set_experiment("train_torch_fashion_mnist") - trainer = Trainer( backend="torch", num_workers=num_workers, use_gpu=use_gpu) trainer.start() - iterator = trainer.run_iterator( + final_results = trainer.run( train_func=train_func, config={ "lr": 1e-3, "batch_size": 64, "epochs": 4 - }) + }, + callbacks=[ + MLflowLoggerCallback(experiment_name="train_fashion_mnist") + ]) - for intermediate_result in iterator: - first_worker_result = intermediate_result[0] - mlflow.log_metric("loss", first_worker_result["loss"]) - - print("Full losses for rank 0 worker: ", iterator.get_final_results()) + print("Full losses for rank 0 worker: ", final_results) if __name__ == "__main__": diff --git a/python/ray/train/examples/mlflow_simple_example.py b/python/ray/train/examples/mlflow_simple_example.py new file mode 100644 index 000000000..4d73489bb --- /dev/null +++ b/python/ray/train/examples/mlflow_simple_example.py @@ -0,0 +1,27 @@ +from ray import train +from ray.train import Trainer +from ray.train.callbacks import MLflowLoggerCallback, TBXLoggerCallback + + +def train_func(): + for i in range(3): + train.report(epoch=i) + + +trainer = Trainer(backend="torch", num_workers=2) +trainer.start() + +# Run the training function, logging all the intermediate results +# to MLflow and Tensorboard. +result = trainer.run( + train_func, + callbacks=[ + MLflowLoggerCallback(experiment_name="train_experiment"), + TBXLoggerCallback() + ]) + +# Print the latest run directory and keep note of it. +# For example: /home/ray_results/train_2021-09-01_12-00-00/run_001 +print("Run directory:", trainer.latest_run_dir) + +trainer.shutdown() diff --git a/python/ray/train/tests/test_callbacks.py b/python/ray/train/tests/test_callbacks.py index 09fe1bb55..e3122cab0 100644 --- a/python/ray/train/tests/test_callbacks.py +++ b/python/ray/train/tests/test_callbacks.py @@ -15,6 +15,7 @@ from ray.train.constants import (TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS, from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback from ray.train.backend import BackendConfig, Backend from ray.train.worker_group import WorkerGroup +from ray.train.callbacks.logging import MLflowLoggerCallback try: from tensorflow.python.summary.summary_iterator \ @@ -136,8 +137,6 @@ def _validate_tbx_result(events_dir): assert len(results["hello/world"]) == 1 -@pytest.mark.skipif( - summary_iterator is None, reason="tensorboard is not installed") def test_TBX(ray_start_4_cpus, make_temp_dir): config = TestConfig() @@ -159,6 +158,54 @@ def test_TBX(ray_start_4_cpus, make_temp_dir): _validate_tbx_result(temp_dir) +def test_mlflow(ray_start_4_cpus, make_temp_dir): + config = TestConfig() + + params = {"p1": "p1"} + + temp_dir = make_temp_dir + num_workers = 4 + + def train_func(config): + train.report(episode_reward_mean=4) + train.report(episode_reward_mean=5) + train.report(episode_reward_mean=6) + return 1 + + callback = MLflowLoggerCallback( + experiment_name="test_exp", logdir=temp_dir) + trainer = Trainer(config, num_workers=num_workers) + trainer.start() + trainer.run(train_func, config=params, callbacks=[callback]) + + from mlflow.tracking import MlflowClient + + client = MlflowClient( + tracking_uri=callback.mlflow_util._mlflow.get_tracking_uri()) + + all_runs = callback.mlflow_util._mlflow.search_runs(experiment_ids=["0"]) + assert len(all_runs) == 1 + # all_runs is a pandas dataframe. + all_runs = all_runs.to_dict(orient="records") + run_id = all_runs[0]["run_id"] + run = client.get_run(run_id) + + assert run.data.params == params + assert "episode_reward_mean" in run.data.metrics and \ + run.data.metrics["episode_reward_mean"] == 6.0 + assert TRAINING_ITERATION in run.data.metrics and \ + run.data.metrics[TRAINING_ITERATION] == 3.0 + + metric_history = client.get_metric_history( + run_id=run_id, key="episode_reward_mean") + + assert len(metric_history) == 3 + iterations = [metric.step for metric in metric_history] + assert iterations == [1, 2, 3] + rewards = [metric.value for metric in metric_history] + assert rewards == [4, 5, 6] + + if __name__ == "__main__": import pytest import sys diff --git a/python/ray/train/trainer.py b/python/ray/train/trainer.py index 9622a251a..c6222a839 100644 --- a/python/ray/train/trainer.py +++ b/python/ray/train/trainer.py @@ -281,7 +281,8 @@ class Trainer: finished_with_errors = False for callback in callbacks: - callback.start_training(logdir=self.latest_run_dir) + callback.start_training( + logdir=str(self.latest_run_dir), config=config or {}) train_func = self._get_train_func(train_func, config) diff --git a/python/ray/tune/integration/mlflow.py b/python/ray/tune/integration/mlflow.py index 50fd624c6..0744a7a15 100644 --- a/python/ray/tune/integration/mlflow.py +++ b/python/ray/tune/integration/mlflow.py @@ -1,4 +1,3 @@ -import os from typing import Dict, Callable, Optional import logging @@ -7,18 +6,12 @@ from ray.tune.trainable import Trainable from ray.tune.logger import Logger, LoggerCallback from ray.tune.result import TRAINING_ITERATION, TIMESTEPS_TOTAL from ray.tune.trial import Trial +from ray.util.annotations import Deprecated +from ray.util.ml_utils.mlflow import MLflowLoggerUtil logger = logging.getLogger(__name__) -def _import_mlflow(): - try: - import mlflow - except ImportError: - mlflow = None - return mlflow - - class MLflowLoggerCallback(LoggerCallback): """MLflow Logger to automatically log Tune results and config to MLflow. @@ -30,16 +23,13 @@ class MLflowLoggerCallback(LoggerCallback): Args: tracking_uri (str): The tracking URI for where to manage experiments and runs. This can either be a local file path or a remote server. - This arg gets passed directly to mlflow.tracking.MlflowClient + This arg gets passed directly to mlflow initialization. When using Tune in a multi-node setting, make sure to set this to a remote server and not a local file path. registry_uri (str): The registry URI that gets passed directly to - mlflow.tracking.MlflowClient initialization. + mlflow initialization. experiment_name (str): The experiment name to use for this Tune run. - If None is passed in here, the Logger will automatically then - check the MLFLOW_EXPERIMENT_NAME and then the MLFLOW_EXPERIMENT_ID - environment variables to determine the experiment name. - If the experiment with the name already exists with MlFlow, + If the experiment with the name already exists with MLflow, it will be reused. If not, a new experiment will be created with that name. tags (Dict): An optional dictionary of string keys and values to set @@ -84,7 +74,9 @@ class MLflowLoggerCallback(LoggerCallback): self.registry_uri = registry_uri self.experiment_name = experiment_name self.tags = tags - self.save_artifact = save_artifact + self.should_save_artifact = save_artifact + + self.mlflow_util = MLflowLoggerUtil() if ray.util.client.ray.is_connected(): logger.warning("When using MLflowLoggerCallback with Ray Client, " @@ -94,55 +86,17 @@ class MLflowLoggerCallback(LoggerCallback): "setup on the server side and not on the client " "side.") - def setup(self): - mlflow = _import_mlflow() - if mlflow is None: - raise RuntimeError("MLflow has not been installed. Please `pip " - "install mlflow` to use the MLflowLogger.") - - from mlflow.tracking import MlflowClient - - self.client = MlflowClient( - tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) - - if self.experiment_name is None: - # If no name is passed in, then check env vars. - # First check if experiment_name env var is set. - self.experiment_name = os.environ.get("MLFLOW_EXPERIMENT_NAME") - - if self.experiment_name is not None: - # First check if experiment with name exists. - experiment = self.client.get_experiment_by_name( - self.experiment_name) - if experiment is not None: - # If it already exists then get the id. - experiment_id = experiment.experiment_id - else: - # If it does not exist, create the experiment. - experiment_id = self.client.create_experiment( - name=self.experiment_name) - else: - # No experiment_name is passed in and name env var is not set. - # Now check the experiment id env var. - experiment_id = os.environ.get("MLFLOW_EXPERIMENT_ID") - # Confirm that an experiment with this id exists. - if experiment_id is None or self.client.get_experiment( - experiment_id) is None: - raise ValueError("No experiment_name passed, " - "MLFLOW_EXPERIMENT_NAME env var is not " - "set, and MLFLOW_EXPERIMENT_ID either " - "is not set or does not exist. Please " - "set one of these to use the " - "MLflowLoggerCallback.") + def setup(self, *args, **kwargs): + # Setup the mlflow logging util. + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, + registry_uri=self.registry_uri, + experiment_name=self.experiment_name) if self.tags is None: # Create empty dictionary for tags if not given explicitly self.tags = {} - # At this point, experiment_id should be set. - self.experiment_id = experiment_id - self.save_artifact = self.save_artifact - self._trial_runs = {} def log_trial_start(self, trial: "Trial"): @@ -153,44 +107,34 @@ class MLflowLoggerCallback(LoggerCallback): tags = self.tags.copy() tags["trial_name"] = str(trial) - run = self.client.create_run( - experiment_id=self.experiment_id, tags=tags) + run = self.mlflow_util.start_run(tags=tags, run_name=str(trial)) self._trial_runs[trial] = run.info.run_id run_id = self._trial_runs[trial] # Log the config parameters. config = trial.config - - for key, value in config.items(): - self.client.log_param(run_id=run_id, key=key, value=value) + self.mlflow_util.log_params(run_id=run_id, params_to_log=config) def log_trial_result(self, iteration: int, trial: "Trial", result: Dict): step = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION] run_id = self._trial_runs[trial] - for key, value in result.items(): - try: - value = float(value) - except (ValueError, TypeError): - logger.debug("Cannot log key {} with value {} since the " - "value cannot be converted to float.".format( - key, value)) - continue - self.client.log_metric( - run_id=run_id, key=key, value=value, step=step) + self.mlflow_util.log_metrics( + run_id=run_id, metrics_to_log=result, step=step) def log_trial_end(self, trial: "Trial", failed: bool = False): run_id = self._trial_runs[trial] # Log the artifact if set_artifact is set to True. - if self.save_artifact: - self.client.log_artifacts(run_id, local_dir=trial.logdir) + if self.should_save_artifact: + self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.logdir) # Stop the run once trial finishes. status = "FINISHED" if not failed else "FAILED" - self.client.set_terminated(run_id=run_id, status=status) + self.mlflow_util.end_run(run_id=run_id, status=status) +@Deprecated class MLflowLogger(Logger): """MLflow logger using the deprecated Logger API. @@ -198,42 +142,10 @@ class MLflowLogger(Logger): or manually set the proper environment variables. """ - _experiment_logger_cls = MLflowLoggerCallback - def _init(self): - mlflow = _import_mlflow() - logger_config = self.config.pop("logger_config", {}) - tracking_uri = logger_config.get("mlflow_tracking_uri") - registry_uri = logger_config.get("mlflow_registry_uri") - - experiment_id = logger_config.get("mlflow_experiment_id") - if experiment_id is None or not mlflow.get_experiment(experiment_id): - raise ValueError( - "You must provide a valid `mlflow_experiment_id` " - "in your `logger_config` dict in the `config` " - "dict passed to `tune.run`. " - "Are you sure you passed in a `experiment_id` and " - "the experiment exists?") - else: - experiment_name = mlflow.get_experiment(experiment_id).name - - self._trial_experiment_logger = self._experiment_logger_cls( - tracking_uri, registry_uri, experiment_name) - - self._trial_experiment_logger.setup() - - self._trial_experiment_logger.log_trial_start(self.trial) - - def on_result(self, result: Dict): - self._trial_experiment_logger.log_trial_result( - iteration=result.get(TRAINING_ITERATION), - trial=self.trial, - result=result) - - def close(self): - self._trial_experiment_logger.log_trial_end( - trial=self.trial, failed=False) - del self._trial_experiment_logger + raise DeprecationWarning("The legacy MLflowLogger has been " + "deprecated. Use the MLflowLoggerCallback " + "instead.") def mlflow_mixin(func: Callable): @@ -330,9 +242,6 @@ def mlflow_mixin(func: Callable): } }) """ - if _import_mlflow() is None: - raise RuntimeError("MLflow has not been installed. Please `pip " - "install mlflow` to use the mlflow_mixin.") if ray.util.client.ray.is_connected(): logger.warning("When using mlflow_mixin with Ray Client, " "it is recommended to use a remote tracking " @@ -349,11 +258,7 @@ def mlflow_mixin(func: Callable): class MLflowTrainableMixin: def __init__(self, config: Dict, *args, **kwargs): - self._mlflow = _import_mlflow() - - if self._mlflow is None: - raise RuntimeError("MLflow has not been installed. Please `pip " - "install mlflow` to use the mlflow_mixin.") + self.mlflow_util = MLflowLoggerUtil() if not isinstance(self, Trainable): raise ValueError( @@ -380,47 +285,29 @@ class MLflowTrainableMixin: "passed in. Make sure to include a `mlflow` " "key in your `config` dict containing at " "least a `tracking_uri`") - self._mlflow.set_tracking_uri(tracking_uri) # Set the tracking token if one is passed in. tracking_token = mlflow_config.pop("token", None) - if tracking_token is not None: - os.environ["MLFLOW_TRACKING_TOKEN"] = tracking_token - # First see if experiment_id is passed in. experiment_id = mlflow_config.pop("experiment_id", None) - if experiment_id is None or self._mlflow.get_experiment( - experiment_id) is None: - logger.debug("Either no experiment_id is passed in, or the " - "experiment with the given id does not exist. " - "Checking experiment_name") - # Check for name. - experiment_name = mlflow_config.pop("experiment_name", None) - if experiment_name is None: - raise ValueError( - "MLflow mixin specified but no " - "experiment_name or experiment_id has been " - "passed in. Make sure to include a `mlflow` " - "key in your `config` dict containing at " - "least a `experiment_name` or `experiment_id` " - "specification.") - experiment = self._mlflow.get_experiment_by_name(experiment_name) - if experiment is not None: - # Experiment with this name exists. - experiment_id = experiment.experiment_id - else: - raise ValueError("No experiment with the given " - "name: {} or id: {} currently exists. Make " - "sure to first start the MLflow experiment " - "before calling tune.run.".format( - experiment_name, experiment_id)) - self.experiment_id = experiment_id + experiment_name = mlflow_config.pop("experiment_name", None) + + # This initialization happens in each of the Trainables/workers. + # So we have to set `create_experiment_if_not_exists` to False. + # Otherwise there might be race conditions when each worker tries to + # create the same experiment. + # For the mixin, the experiment must be created beforehand. + self.mlflow_util.setup_mlflow( + tracking_uri=tracking_uri, + experiment_id=experiment_id, + experiment_name=experiment_name, + tracking_token=tracking_token, + create_experiment_if_not_exists=False) run_name = self.trial_name + "_" + self.trial_id run_name = run_name.replace("/", "_") - self._mlflow.start_run( - experiment_id=self.experiment_id, run_name=run_name) + self.mlflow_util.start_run(set_active=True, run_name=run_name) def stop(self): - self._mlflow.end_run() + self.mlflow_util.end_run() diff --git a/python/ray/tune/tests/test_integration_mlflow.py b/python/ray/tune/tests/test_integration_mlflow.py index 65225844a..df3bc1ab5 100644 --- a/python/ray/tune/tests/test_integration_mlflow.py +++ b/python/ray/tune/tests/test_integration_mlflow.py @@ -1,12 +1,16 @@ import os +import tempfile import unittest from collections import namedtuple from unittest.mock import patch from ray.tune.function_runner import wrap_function -from ray.tune.integration.mlflow import MLflowLoggerCallback, MLflowLogger, \ +from ray.tune.integration.mlflow import MLflowLoggerCallback, \ mlflow_mixin, MLflowTrainableMixin +from mlflow.tracking import MlflowClient +from ray.util.ml_utils.mlflow import MLflowLoggerUtil + class MockTrial( namedtuple("MockTrial", @@ -18,100 +22,10 @@ class MockTrial( return self.trial_name -MockRunInfo = namedtuple("MockRunInfo", ["run_id"]) - - -class MockRun: - def __init__(self, run_id, tags=None): - self.run_id = run_id - self.tags = tags - self.info = MockRunInfo(run_id) - self.params = [] - self.metrics = [] - self.artifacts = [] - - def log_param(self, key, value): - self.params.append({key: value}) - - def log_metric(self, key, value): - self.metrics.append({key: value}) - - def log_artifact(self, artifact): - self.artifacts.append(artifact) - - def set_terminated(self, status): - self.terminated = True - self.status = status - - -MockExperiment = namedtuple("MockExperiment", ["name", "experiment_id"]) - - -class MockMlflowClient: - def __init__(self, tracking_uri=None, registry_uri=None): - self.tracking_uri = tracking_uri - self.registry_uri = registry_uri - self.experiments = [MockExperiment("existing_experiment", 0)] - self.runs = {0: []} - self.active_run = None - - def set_tracking_uri(self, tracking_uri): - self.tracking_uri = tracking_uri - - def get_experiment_by_name(self, name): - try: - index = self.experiment_names.index(name) - return self.experiments[index] - except ValueError: - return None - - def get_experiment(self, experiment_id): - experiment_id = int(experiment_id) - try: - return self.experiments[experiment_id] - except IndexError: - return None - - def create_experiment(self, name): - experiment_id = len(self.experiments) - self.experiments.append(MockExperiment(name, experiment_id)) - self.runs[experiment_id] = [] - return experiment_id - - def create_run(self, experiment_id, tags=None): - experiment_runs = self.runs[experiment_id] - run_id = (experiment_id, len(experiment_runs)) - run = MockRun(run_id=run_id, tags=tags) - experiment_runs.append(run) - return run - - def start_run(self, experiment_id, run_name): - # Creates new run and sets it as active. - run = self.create_run(experiment_id) - self.active_run = run - - def get_mock_run(self, run_id): - return self.runs[run_id[0]][run_id[1]] - - def log_param(self, run_id, key, value): - run = self.get_mock_run(run_id) - run.log_param(key, value) - - def log_metric(self, run_id, key, value, step): - run = self.get_mock_run(run_id) - run.log_metric(key, value) - - def log_artifacts(self, run_id, local_dir): - run = self.get_mock_run(run_id) - run.log_artifact(local_dir) - - def set_terminated(self, run_id, status): - run = self.get_mock_run(run_id) - run.set_terminated(status) - - @property - def experiment_names(self): - return [e.name for e in self.experiments] +class MockMLflowLoggerUtil(MLflowLoggerUtil): + def save_artifacts(self, dir, run_id): + self.artifact_saved = True + self.artifact_info = {"dir": dir, "run_id": run_id} def clear_env_vars(): @@ -122,87 +36,104 @@ def clear_env_vars(): class MLflowTest(unittest.TestCase): - @patch("mlflow.tracking.MlflowClient", MockMlflowClient) + def setUp(self): + self.tracking_uri = tempfile.mkdtemp() + self.registry_uri = tempfile.mkdtemp() + + client = MlflowClient( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) + client.create_experiment(name="existing_experiment") + assert client.get_experiment_by_name( + "existing_experiment").experiment_id == "0" + def testMlFlowLoggerCallbackConfig(self): # Explicitly pass in all args. logger = MLflowLoggerCallback( - tracking_uri="test1", - registry_uri="test2", + tracking_uri=self.tracking_uri, + registry_uri=self.registry_uri, experiment_name="test_exp") logger.setup() - self.assertEqual(logger.client.tracking_uri, "test1") - self.assertEqual(logger.client.registry_uri, "test2") - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment", "test_exp"]) - self.assertEqual(logger.experiment_id, 1) + self.assertEqual(logger.mlflow_util._mlflow.get_tracking_uri(), + self.tracking_uri) + self.assertEqual(logger.mlflow_util._mlflow.get_registry_uri(), + self.registry_uri) + self.assertListEqual( + [e.name for e in logger.mlflow_util._mlflow.list_experiments()], + ["existing_experiment", "test_exp"]) + self.assertEqual(logger.mlflow_util.experiment_id, "1") # Check if client recognizes already existing experiment. - logger = MLflowLoggerCallback(experiment_name="existing_experiment") + logger = MLflowLoggerCallback( + experiment_name="existing_experiment", + tracking_uri=self.tracking_uri, + registry_uri=self.registry_uri) logger.setup() - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment"]) - self.assertEqual(logger.experiment_id, 0) + self.assertEqual(logger.mlflow_util.experiment_id, "0") # Pass in experiment name as env var. clear_env_vars() os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_exp" - logger = MLflowLoggerCallback() + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) logger.setup() - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment", "test_exp"]) - self.assertEqual(logger.experiment_id, 1) + self.assertEqual(logger.mlflow_util.experiment_id, "1") # Pass in existing experiment name as env var. clear_env_vars() os.environ["MLFLOW_EXPERIMENT_NAME"] = "existing_experiment" - logger = MLflowLoggerCallback() + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) logger.setup() - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment"]) - self.assertEqual(logger.experiment_id, 0) + self.assertEqual(logger.mlflow_util.experiment_id, "0") # Pass in existing experiment id as env var. clear_env_vars() os.environ["MLFLOW_EXPERIMENT_ID"] = "0" - logger = MLflowLoggerCallback() + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) logger.setup() - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment"]) - self.assertEqual(logger.experiment_id, "0") + self.assertEqual(logger.mlflow_util.experiment_id, "0") # Pass in non existing experiment id as env var. + # This should create a new experiment. clear_env_vars() os.environ["MLFLOW_EXPERIMENT_ID"] = "500" with self.assertRaises(ValueError): - logger = MLflowLoggerCallback() + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) logger.setup() - # Experiment name env var should take precedence over id env var. + # Experiment id env var should take precedence over name env var. clear_env_vars() os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_exp" os.environ["MLFLOW_EXPERIMENT_ID"] = "0" - logger = MLflowLoggerCallback() + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, registry_uri=self.registry_uri) logger.setup() - self.assertListEqual(logger.client.experiment_names, - ["existing_experiment", "test_exp"]) - self.assertEqual(logger.experiment_id, 1) + self.assertEqual(logger.mlflow_util.experiment_id, "0") # Using tags tags = {"user_name": "John", "git_commit_hash": "abc123"} clear_env_vars() os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_tags" os.environ["MLFLOW_EXPERIMENT_ID"] = "0" - logger = MLflowLoggerCallback(tags=tags) + logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, + registry_uri=self.registry_uri, + tags=tags) logger.setup() self.assertEqual(logger.tags, tags) - @patch("mlflow.tracking.MlflowClient", MockMlflowClient) + @patch("ray.tune.integration.mlflow.MLflowLoggerUtil", + MockMLflowLoggerUtil) def testMlFlowLoggerLogging(self): clear_env_vars() - trial_config = {"par1": 4, "par2": 9.0} + trial_config = {"par1": "a", "par2": "b"} trial = MockTrial(trial_config, "trial1", 0, "artifact") logger = MLflowLoggerCallback( + tracking_uri=self.tracking_uri, + registry_uri=self.registry_uri, experiment_name="test1", save_artifact=True, tags={"hello": "world"}) @@ -210,20 +141,24 @@ class MLflowTest(unittest.TestCase): # Check if run is created with proper tags. logger.on_trial_start(iteration=0, trials=[], trial=trial) - # New run should be created for this trial with correct tag. - mock_run = logger.client.runs[1][0] - self.assertDictEqual(mock_run.tags, { + all_runs = logger.mlflow_util._mlflow.search_runs(experiment_ids=["1"]) + self.assertEqual(len(all_runs), 1) + # all_runs is a pandas dataframe. + all_runs = all_runs.to_dict(orient="records") + run = logger.mlflow_util._mlflow.get_run(all_runs[0]["run_id"]) + self.assertDictEqual(run.data.tags, { "hello": "world", - "trial_name": "trial1" + "trial_name": "trial1", + "mlflow.runName": "trial1" }) - self.assertTupleEqual(mock_run.run_id, (1, 0)) - self.assertTupleEqual(logger._trial_runs[trial], mock_run.run_id) + self.assertEqual(logger._trial_runs[trial], run.info.run_id) # Params should be logged. - self.assertListEqual(mock_run.params, [{"par1": 4}, {"par2": 9}]) + self.assertDictEqual(run.data.params, trial_config) # When same trial is started again, new run should not be created. logger.on_trial_start(iteration=0, trials=[], trial=trial) - self.assertEqual(len(logger.client.runs[1]), 1) + all_runs = logger.mlflow_util._mlflow.search_runs(experiment_ids=["1"]) + self.assertEqual(len(all_runs), 1) # Check metrics are logged properly. result = { @@ -233,55 +168,22 @@ class MLflowTest(unittest.TestCase): "training_iteration": 0 } logger.on_trial_result(0, [], trial, result) - mock_run = logger.client.runs[1][0] + run = logger.mlflow_util._mlflow.get_run(run_id=run.info.run_id) # metric3 is not logged since it cannot be converted to float. - self.assertListEqual(mock_run.metrics, [{ - "metric1": 0.8 - }, { - "metric2": 1.0 - }, { + self.assertDictEqual(run.data.metrics, { + "metric1": 0.8, + "metric2": 1.0, "training_iteration": 0 - }]) + }) # Check that artifact is logged on termination. logger.on_trial_complete(0, [], trial) - mock_run = logger.client.runs[1][0] - self.assertListEqual(mock_run.artifacts, ["artifact"]) - self.assertTrue(mock_run.terminated) - self.assertEqual(mock_run.status, "FINISHED") + self.assertTrue(logger.mlflow_util.artifact_saved) + self.assertDictEqual(logger.mlflow_util.artifact_info, { + "dir": "artifact", + "run_id": run.info.run_id + }) - @patch("mlflow.tracking.MlflowClient", MockMlflowClient) - def testMlFlowLegacyLoggerConfig(self): - mlflow = MockMlflowClient() - with patch.dict("sys.modules", mlflow=mlflow): - clear_env_vars() - trial_config = {"par1": 4, "par2": 9.0} - trial = MockTrial(trial_config, "trial1", 0, "artifact") - - # No experiment_id is passed in config, should raise an error. - with self.assertRaises(ValueError): - logger = MLflowLogger(trial_config, "/tmp", trial) - - trial_config.update({ - "logger_config": { - "mlflow_tracking_uri": "test_tracking_uri", - "mlflow_experiment_id": 0 - } - }) - trial = MockTrial(trial_config, "trial2", 1, "artifact") - logger = MLflowLogger(trial_config, "/tmp", trial) - - experiment_logger = logger._trial_experiment_logger - client = experiment_logger.client - self.assertEqual(client.tracking_uri, "test_tracking_uri") - # Check to make sure that a run was created on experiment_id 0. - self.assertEqual(len(client.runs[0]), 1) - mock_run = client.runs[0][0] - self.assertDictEqual(mock_run.tags, {"trial_name": "trial2"}) - self.assertListEqual(mock_run.params, [{"par1": 4}, {"par2": 9}]) - - @patch("ray.tune.integration.mlflow._import_mlflow", - lambda: MockMlflowClient()) def testMlFlowMixinConfig(self): clear_env_vars() trial_config = {"par1": 4, "par2": 9.0} @@ -294,40 +196,24 @@ class MLflowTest(unittest.TestCase): # No MLflow config passed in. with self.assertRaises(ValueError): - wrapped = wrap_function(train_fn)(trial_config) + wrap_function(train_fn)(trial_config) trial_config.update({"mlflow": {}}) # No tracking uri or experiment_id/name passed in. with self.assertRaises(ValueError): - wrapped = wrap_function(train_fn)(trial_config) + wrap_function(train_fn)(trial_config) # Invalid experiment-id trial_config["mlflow"].update({"experiment_id": "500"}) # No tracking uri or experiment_id/name passed in. with self.assertRaises(ValueError): - wrapped = wrap_function(train_fn)(trial_config) + wrap_function(train_fn)(trial_config) - trial_config["mlflow"].update({ - "tracking_uri": "test_tracking_uri", - "experiment_name": "existing_experiment" - }) - wrapped = wrap_function(train_fn)(trial_config) - client = wrapped._mlflow - self.assertEqual(client.tracking_uri, "test_tracking_uri") - self.assertTupleEqual(client.active_run.run_id, (0, 0)) - - with patch("ray.tune.integration.mlflow._import_mlflow", - lambda: client): - train_fn.__mixins__ = (MLflowTrainableMixin, ) - wrapped = wrap_function(train_fn)(trial_config) - client = wrapped._mlflow - self.assertTupleEqual(client.active_run.run_id, (0, 1)) - - # Set to experiment that does not already exist. - # New experiment should be created. - trial_config["mlflow"]["experiment_name"] = "new_experiment" - with self.assertRaises(ValueError): - wrapped = wrap_function(train_fn)(trial_config) + # Set to experiment that does not already exist. + # New experiment should be created. + trial_config["mlflow"]["experiment_name"] = "new_experiment" + with self.assertRaises(ValueError): + wrap_function(train_fn)(trial_config) if __name__ == "__main__": diff --git a/python/ray/util/ml_utils/BUILD b/python/ray/util/ml_utils/BUILD new file mode 100644 index 000000000..8f43babe6 --- /dev/null +++ b/python/ray/util/ml_utils/BUILD @@ -0,0 +1,18 @@ +# -------------------------------------------------------------------- +# Tests from the python/ray/util/ml_util/tests directory. +# Please keep these sorted alphabetically. +# -------------------------------------------------------------------- +py_test( + name = "test_mlflow", + size = "medium", + srcs = ["tests/test_mlflow.py"], + tags = ["team:ml", "exclusive"], + deps = [":ml_util_lib"] +) + +# This is a dummy test dependency that causes the above tests to be +# re-run if any of these files changes. +py_library( + name = "ml_util_lib", + srcs = glob(["**/*.py"], exclude=["tests/*.py"]), +) diff --git a/python/ray/util/ml_utils/mlflow.py b/python/ray/util/ml_utils/mlflow.py new file mode 100644 index 000000000..0ecc62c7b --- /dev/null +++ b/python/ray/util/ml_utils/mlflow.py @@ -0,0 +1,284 @@ +import os +import logging +from typing import Dict, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from mlflow.entities import Run + from mlflow.tracking import MlflowClient + +logger = logging.getLogger(__name__) + + +class MLflowLoggerUtil: + """Util class for setting up and logging to MLflow. + + Use this util for any library that needs MLflow logging/tracking logic + such as Ray Tune or Ray Train. + """ + + def __init__(self): + import mlflow + self._mlflow = mlflow + self.experiment_id = None + + def setup_mlflow( + self, + tracking_uri: Optional[str] = None, + registry_uri: Optional[str] = None, + experiment_id: Optional[str] = None, + experiment_name: Optional[str] = None, + tracking_token=None, + create_experiment_if_not_exists: bool = True, + ): + """ + Sets up MLflow. + + Sets the Mlflow tracking uri & token, and registry URI. Also sets + the MLflow experiment that the logger should use, and possibly + creates new experiment if it does not exist. + + Args: + tracking_uri (str): The tracking URI for the MLflow tracking + server. + registry_uri (str): The registry URI for the MLflow model registry. + experiment_id (str): The id of an already existing MLflow + experiment to use for logging. If None is passed in + here and the MFLOW_EXPERIMENT_ID is not set, or the + experiment with this id does not exist, + ``experiment_name`` will be used instead. This argument takes + precedence over ``experiment_name`` if both are passed in. + experiment_name (str): The experiment name to use for logging. + If None is passed in here, the + the MLFLOW_EXPERIMENT_NAME environment variables is used to + determine the experiment name. + If the experiment with the name already exists with MLflow, + it will be reused. If not, a new experiment will be created + with the provided name if + ``create_experiment_if_not_exists`` is set to True. + create_experiment_if_not_exists (bool): Whether to create an + experiment with the provided name if it does not already + exist. Defaults to True. + + Returns: + Whether setup is successful. + """ + if tracking_token: + os.environ["MLFLOW_TRACKING_TOKEN"] = tracking_token + + self._mlflow.set_tracking_uri(tracking_uri) + self._mlflow.set_registry_uri(registry_uri) + + # First check experiment_id. + experiment_id = experiment_id if experiment_id is not None else \ + os.environ.get("MLFLOW_EXPERIMENT_ID") + if experiment_id is not None: + from mlflow.exceptions import MlflowException + try: + self._mlflow.get_experiment(experiment_id=experiment_id) + logger.debug(f"Experiment with provided id {experiment_id} " + "exists. Setting that as the experiment.") + self.experiment_id = experiment_id + return + except MlflowException: + pass + + # Then check experiment_name. + experiment_name = experiment_name if experiment_name is not None else \ + os.environ.get("MLFLOW_EXPERIMENT_NAME") + if experiment_name is not None and self._mlflow.get_experiment_by_name( + name=experiment_name): + logger.debug(f"Experiment with provided name {experiment_name} " + "exists. Setting that as the experiment.") + self.experiment_id = self._mlflow.get_experiment_by_name( + experiment_name).experiment_id + return + + # An experiment with the provided id or name does not exist. + # Create a new experiment if applicable. + if experiment_name and create_experiment_if_not_exists: + logger.debug("Existing experiment not found. Creating new " + f"experiment with name: {experiment_name}") + self.experiment_id = self._mlflow.create_experiment( + name=experiment_name) + return + + if create_experiment_if_not_exists: + raise ValueError(f"Experiment with the provided experiment_id: " + f"{experiment_id} does not exist and no " + f"experiment_name provided. At least one of " + f"these has to be provided.") + else: + raise ValueError(f"Experiment with the provided experiment_id: " + f"{experiment_id} or experiment_name: " + f"{experiment_name} does not exist. Please " + f"create an MLflow experiment and provide " + f"either its id or name.") + + def _parse_dict(self, dict_to_log: Dict) -> Dict: + """Parses provided dict to convert all values to float. + + MLflow can only log metrics that are floats. This does not apply to + logging parameters or artifacts. + + Args: + dict_to_log (Dict): The dictionary containing the metrics to log. + + Returns: + A dictionary containing the metrics to log with all values being + converted to floats, or skipped if not able to be converted. + """ + new_dict = {} + for key, value in dict_to_log.items(): + try: + value = float(value) + new_dict[key] = value + except (ValueError, TypeError): + logger.debug("Cannot log key {} with value {} since the " + "value cannot be converted to float.".format( + key, value)) + continue + + return new_dict + + def start_run(self, + run_name: Optional[str] = None, + tags: Optional[Dict] = None, + set_active: bool = False) -> "Run": + """Starts a new run and possibly sets it as the active run. + + Args: + tags (Optional[Dict]): Tags to set for the new run. + set_active (bool): Whether to set the new run as the active run. + If an active run already exists, then that run is returned. + + Returns: + The newly created MLflow run. + """ + + if set_active: + return self._start_active_run(run_name=run_name, tags=tags) + + from mlflow.utils.mlflow_tags import MLFLOW_RUN_NAME + + client = self._get_client() + tags[MLFLOW_RUN_NAME] = run_name + run = client.create_run(experiment_id=self.experiment_id, tags=tags) + + return run + + def _start_active_run(self, + run_name: Optional[str] = None, + tags: Optional[Dict] = None) -> "Run": + """Starts a run and sets it as the active run if one does not exist. + + If an active run already exists, then returns it. + """ + active_run = self._mlflow.active_run() + if active_run: + return active_run + + return self._mlflow.start_run(run_name=run_name, tags=tags) + + def _run_exists(self, run_id: str) -> bool: + """Check if run with the provided id exists.""" + from mlflow.exceptions import MlflowException + + try: + self._mlflow.get_run(run_id=run_id) + return True + except MlflowException: + return False + + def _get_client(self) -> "MlflowClient": + """Returns an ml.tracking.MlflowClient instance to use for logging.""" + tracking_uri = self._mlflow.get_tracking_uri() + registry_uri = self._mlflow.get_registry_uri() + + from mlflow.tracking import MlflowClient + + return MlflowClient( + tracking_uri=tracking_uri, registry_uri=registry_uri) + + def log_params(self, params_to_log: Dict, run_id: Optional[str] = None): + """Logs the provided parameters to the run specified by run_id. + + If no ``run_id`` is passed in, then logs to the current active run. + If there is not active run, then creates a new run and sets it as + the active run. + + Args: + params_to_log (Dict): Dictionary of parameters to log. + run_id (Optional[str]): The ID of the run to log to. + """ + + if run_id and self._run_exists(run_id): + client = self._get_client() + for key, value in params_to_log.items(): + client.log_param(run_id=run_id, key=key, value=value) + + else: + for key, value in params_to_log.items(): + self._mlflow.log_param(key=key, value=value) + + def log_metrics(self, + step, + metrics_to_log: Dict, + run_id: Optional[str] = None): + """Logs the provided metrics to the run specified by run_id. + + + If no ``run_id`` is passed in, then logs to the current active run. + If there is not active run, then creates a new run and sets it as + the active run. + + Args: + metrics_to_log (Dict): Dictionary of metrics to log. + run_id (Optional[str]): The ID of the run to log to. + """ + metrics_to_log = self._parse_dict(metrics_to_log) + + if run_id and self._run_exists(run_id): + client = self._get_client() + for key, value in metrics_to_log.items(): + client.log_metric( + run_id=run_id, key=key, value=value, step=step) + + else: + for key, value in metrics_to_log.items(): + self._mlflow.log_metric(key=key, value=value, step=step) + + def save_artifacts(self, dir: str, run_id: Optional[str] = None): + """Saves directory as artifact to the run specified by run_id. + + If no ``run_id`` is passed in, then saves to the current active run. + If there is not active run, then creates a new run and sets it as + the active run. + + Args: + dir (str): Path to directory containing the files to save. + run_id (Optional[str]): The ID of the run to log to. + """ + if run_id and self._run_exists(run_id): + client = self._get_client() + client.log_artifacts(run_id=run_id, local_dir=dir) + else: + self._mlflow.log_artifacts(local_dir=dir) + + def end_run(self, status: Optional[str] = None, run_id=None): + """Terminates the run specified by run_id. + + If no ``run_id`` is passed in, then terminates the + active run if one exists. + + Args: + status (Optional[str]): The status to set when terminating the run. + run_id (Optional[str]): The ID of the run to terminate. + + """ + if run_id and self._run_exists(run_id) and not ( + self._mlflow.active_run() + and self._mlflow.active_run().info.run_id == run_id): + client = self._get_client() + client.set_terminated(run_id=run_id, status=status) + else: + self._mlflow.end_run(status=status) diff --git a/python/ray/util/ml_utils/tests/__init__.py b/python/ray/util/ml_utils/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/python/ray/util/ml_utils/tests/test_mlflow.py b/python/ray/util/ml_utils/tests/test_mlflow.py new file mode 100644 index 000000000..058a57e71 --- /dev/null +++ b/python/ray/util/ml_utils/tests/test_mlflow.py @@ -0,0 +1,112 @@ +import os +import shutil +import tempfile +import unittest + +from ray.util.ml_utils.mlflow import MLflowLoggerUtil + + +class MLflowTest(unittest.TestCase): + def setUp(self): + self.dirpath = tempfile.mkdtemp() + import mlflow + mlflow.set_tracking_uri(self.dirpath) + mlflow.create_experiment(name="existing_experiment") + + self.mlflow_util = MLflowLoggerUtil() + self.tracking_uri = mlflow.get_tracking_uri() + + def tearDown(self): + shutil.rmtree(self.dirpath) + + def test_experiment_id(self): + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, experiment_id="0") + assert self.mlflow_util.experiment_id == "0" + + def test_experiment_id_env_var(self): + os.environ["MLFLOW_EXPERIMENT_ID"] = "0" + self.mlflow_util.setup_mlflow(tracking_uri=self.tracking_uri) + assert self.mlflow_util.experiment_id == "0" + del os.environ["MLFLOW_EXPERIMENT_ID"] + + def test_experiment_name(self): + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, + experiment_name="existing_experiment") + assert self.mlflow_util.experiment_id == "0" + + def test_experiment_name_env_var(self): + os.environ["MLFLOW_EXPERIMENT_NAME"] = "existing_experiment" + self.mlflow_util.setup_mlflow(tracking_uri=self.tracking_uri) + assert self.mlflow_util.experiment_id == "0" + del os.environ["MLFLOW_EXPERIMENT_NAME"] + + def test_id_precedence(self): + os.environ["MLFLOW_EXPERIMENT_ID"] = "0" + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, experiment_name="new_experiment") + assert self.mlflow_util.experiment_id == "0" + del os.environ["MLFLOW_EXPERIMENT_ID"] + + def test_new_experiment(self): + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, experiment_name="new_experiment") + assert self.mlflow_util.experiment_id == "1" + + def test_setup_fail(self): + with self.assertRaises(ValueError): + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, + experiment_name="new_experiment2", + create_experiment_if_not_exists=False) + + def test_log_params(self): + params = {"a": "a"} + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, experiment_name="new_experiment") + run = self.mlflow_util.start_run() + run_id = run.info.run_id + self.mlflow_util.log_params(params_to_log=params, run_id=run_id) + + run = self.mlflow_util._mlflow.get_run(run_id=run_id) + assert run.data.params == params + + params2 = {"b": "b"} + self.mlflow_util.start_run(set_active=True) + self.mlflow_util.log_params(params_to_log=params2, run_id=run_id) + assert self.mlflow_util._mlflow.get_run(run_id=run_id).data.params == { + **params, + **params2 + } + self.mlflow_util.end_run() + + def test_log_metrics(self): + metrics = {"a": 1.0} + self.mlflow_util.setup_mlflow( + tracking_uri=self.tracking_uri, experiment_name="new_experiment") + run = self.mlflow_util.start_run() + run_id = run.info.run_id + self.mlflow_util.log_metrics( + metrics_to_log=metrics, run_id=run_id, step=0) + + run = self.mlflow_util._mlflow.get_run(run_id=run_id) + assert run.data.metrics == metrics + + metrics2 = {"b": 1.0} + self.mlflow_util.start_run(set_active=True) + self.mlflow_util.log_metrics( + metrics_to_log=metrics2, run_id=run_id, step=0) + assert self.mlflow_util._mlflow.get_run( + run_id=run_id).data.metrics == { + **metrics, + **metrics2 + } + self.mlflow_util.end_run() + + +if __name__ == "__main__": + import pytest + import sys + + sys.exit(pytest.main(["-v", __file__])) diff --git a/python/requirements/ml/requirements_train.txt b/python/requirements/ml/requirements_train.txt index f01302faa..d1a68aa4f 100644 --- a/python/requirements/ml/requirements_train.txt +++ b/python/requirements/ml/requirements_train.txt @@ -1,6 +1,7 @@ -r requirements_dl.txt mlflow==1.21.0 +tensorboardX==2.4.1 # Dependencies for Hugging Face examples: # `python/ray/train/examples/transformers/transformers_example.py`