[Train] [Tune] Refactor MLflow (#20802)

Pulls out Tune's MLflow logging logic to a shared MLflow util.
Adds an MLflow logger callback to Ray Train

Closes #20642
This commit is contained in:
Amog Kamsetty 2021-12-21 17:17:52 -08:00 committed by GitHub
parent 09421a4ca6
commit 57db4640ca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 840 additions and 455 deletions

View file

@ -318,6 +318,13 @@
- DATA_PROCESSING_TESTING=1 PYTHON=3.7 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/workflow/... python/ray/data/...
- label: ":slot_machine: ML Utils tests"
conditions: ["RAY_CI_ML_UTILS_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/travis/install-dependencies.sh
- bazel test --config=ci $(./scripts/bazel_export_options) --build_tests_only python/ray/util/ml_utils/...
- label: ":book: Doc tests and examples"
conditions:
["RAY_CI_PYTHON_AFFECTED", "RAY_CI_TUNE_AFFECTED", "RAY_CI_DOC_AFFECTED"]

View file

@ -148,6 +148,7 @@ if __name__ == "__main__":
RAY_CI_SGD_AFFECTED = 1
RAY_CI_TUNE_AFFECTED = 1
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_ML_UTILS_AFFECTED = 1
elif re.match("^(python/ray/)?rllib/", changed_file):
RAY_CI_RLLIB_AFFECTED = 1
RAY_CI_RLLIB_DIRECTLY_AFFECTED = 1

View file

@ -25,36 +25,11 @@ import torch.optim as optim
from ray import train
from ray.data.aggregate import Mean, Std
from ray.train import Trainer
from ray.train import TrainingCallback
from ray.train.callbacks.logging import MLflowLoggerCallback
from ray.train.callbacks import TBXLoggerCallback
from torch.nn.parallel import DistributedDataParallel
# TODO(amogkam): Upstream this into Ray Train.
class MLflowCallback(TrainingCallback):
def __init__(self, config):
self.config = config
def handle_result(self, results, **info):
# For each result that's being reported by ``train.report()``,
# we get the result from the rank 0 worker (i.e. first worker) and
# report it to MLflow.
rank_zero_results = results[0]
mlflow.log_metrics(rank_zero_results)
# TODO: fix type hint for logdir
def start_training(self, logdir, **info):
mlflow.start_run(run_name=str(logdir.name))
mlflow.log_params(config)
# TODO: Update TrainCallback to provide logdir in finish_training.
self.logdir = logdir
def finish_training(self, error: bool = False, **info):
# Save the Trainer checkpoints as artifacts to mlflow.
mlflow.log_artifacts(self.logdir)
def make_and_upload_dataset(dir_path):
import random
@ -641,7 +616,11 @@ if __name__ == "__main__":
# and should also create 1 directory per file.
tbx_logdir = "./runs"
os.makedirs(tbx_logdir, exist_ok=True)
callbacks = [TBXLoggerCallback(logdir=tbx_logdir), MLflowCallback(config)]
callbacks = [
TBXLoggerCallback(logdir=tbx_logdir),
MLflowLoggerCallback(
experiment_name="cuj-big-data-training", save_artifact=True)
]
# Remove CPU resource so Datasets can be scheduled.
resources_per_worker = {"CPU": 0, "GPU": 1} if use_gpu else None

View file

@ -72,6 +72,13 @@ TBXLoggerCallback
.. autoclass:: ray.train.callbacks.TBXLoggerCallback
.. _train-api-mlflow-logger-callback:
MLflowLoggerCallback
~~~~~~~~~~~~~~~~~~~~
.. autoclass:: ray.train.callbacks.MLflowLoggerCallback
Checkpointing
-------------

View file

@ -47,11 +47,10 @@ Horovod
Simple example for Horovod (with TensorFlow)
Iterator API Examples
---------------------
Logger/Callback Examples
------------------------
* :doc:`/train/examples/mlflow_fashion_mnist_example`:
Example for using the Iterator API for custom MLFlow integration.
Example for logging training to MLflow via the ``MLflowLoggerCallback``
Ray Datasets Integration Examples
---------------------------------

View file

@ -317,7 +317,7 @@ Log directories are exposed through the following attributes:
Logs will be written by:
1. :ref:`Logging Callbacks <train-logging-callbacks>`
1. :ref:`Callbacks <train-callbacks>`
2. :ref:`Checkpoints <train-checkpointing>`
.. TODO link to Training Run Iterator API as a 3rd option for logging.
@ -327,6 +327,11 @@ Logs will be written by:
Logging, Monitoring, and Callbacks
----------------------------------
Ray Train has mechanisms to easily collect intermediate results from the training workers during the training run
and also has a :ref:`Callback interface <train-callbacks>` to perform actions on these intermediate results (such as logging, aggregations, printing, etc.).
You can use either the :ref:`built-in callbacks <train-builtin-callbacks>` that Ray Train provides,
or implement a :ref:`custom callback <train-custom-callbacks>` for your use case.
Reporting intermediate results
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -393,9 +398,64 @@ Callbacks
~~~~~~~~~
You may want to plug in your training code with your favorite experiment management framework.
Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results.
Ray Train provides an interface to fetch intermediate results and callbacks to process/log your intermediate results
(the values passed into ``train.report(...)``).
You can plug all of these into Ray Train with the following interface:
Ray Train contains built-in callbacks for popular tracking frameworks, or you can implement your own callback via the ``TrainCallback`` interface.
.. _train-builtin-callbacks:
Built-in Callbacks
++++++++++++++++++
The following ``TrainingCallback``\s are available and will log the intermediate results of the training run.
1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
3. :ref:`train-api-mlflow-logger-callback`
Example: Logging to MLflow and Tensorboard
++++++++++++++++++++++++++++++++++++++++++
**Step 1: Install the necessary packages**
.. code-block:: bash
$ pip install mlflow
$ pip install tensorboardX
**Step 2: Run the following training script**
.. literalinclude:: /../../python/ray/train/examples/mlflow_simple_example.py
:language: python
**Step 3: Visualize the logs**
.. code-block:: bash
# Navigate to the run directory of the trainer.
# For example `cd /home/ray_results/train_2021-09-01_12-00-00/run_001`
$ cd <TRAINER_RUN_DIR>
# View the MLflow UI.
$ mlflow ui
# View the tensorboard UI.
$ tensorboard --logdir .
.. _train-custom-callbacks:
Custom Callbacks
++++++++++++++++
If the provided callbacks do not cover your desired integrations or use-cases,
you may always implement a custom callback by subclassing ``TrainingCallback``. If
the callback is general enough, please feel welcome to `add it <https://docs
.ray.io/en/master/getting-involved.html>`_ to the ``ray``
`repository <https://github.com/ray-project/ray>`_.
A simple example for creating a callback that will print out results:
.. code-block:: python
@ -422,44 +482,6 @@ You can plug all of these into Ray Train with the following interface:
# [{'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0014500617980957031, '_training_iteration': 3}, {'epoch': 2, '_timestamp': 1630471763, '_time_this_iter_s': 0.0015292167663574219, '_training_iteration': 3}]
trainer.shutdown()
.. Here is a list of callbacks that are supported by Ray Train:
.. * JsonLoggerCallback
.. * TBXLoggerCallback
.. * WandbCallback
.. * MlflowCallback
.. * CSVCallback
.. _train-logging-callbacks:
Logging Callbacks
+++++++++++++++++
The following ``TrainingCallback``\s are available and will write to a file within the
:ref:`log directory <train-log-dir>` of each training run.
1. :ref:`train-api-json-logger-callback`
2. :ref:`train-api-tbx-logger-callback`
Custom Callbacks
++++++++++++++++
If the provided callbacks do not cover your desired integrations or use-cases,
you may always implement a custom callback by subclassing ``TrainingCallback``. If
the callback is general enough, please feel welcome to `add it <https://docs
.ray.io/en/master/getting-involved.html>`_ to the ``ray``
`repository <https://github.com/ray-project/ray>`_.
A simple example for creating a callback that will print out results:
.. code-block:: python
from ray.train import TrainingCallback
class PrintingCallback(TrainingCallback):
def handle_result(self, results: List[Dict], **info):
print(results)
..
Advanced Customization

View file

@ -12,6 +12,15 @@ py_test(
args = ["--smoke-test"]
)
py_test(
name = "mlflow_simple_example",
size = "medium",
main = "examples/mlflow_simple_example.py",
srcs = ["examples/mlflow_simple_example.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib"],
)
py_test(
name = "tensorflow_quick_start",
size = "medium",

View file

@ -1,4 +1,8 @@
from ray.train.callbacks.callback import TrainingCallback
from ray.train.callbacks.logging import (JsonLoggerCallback, TBXLoggerCallback)
from ray.train.callbacks.logging import (
JsonLoggerCallback, MLflowLoggerCallback, TBXLoggerCallback)
__all__ = ["TrainingCallback", "JsonLoggerCallback", "TBXLoggerCallback"]
__all__ = [
"TrainingCallback", "JsonLoggerCallback", "MLflowLoggerCallback",
"TBXLoggerCallback"
]

View file

@ -16,12 +16,13 @@ class TrainingCallback(metaclass=abc.ABCMeta):
"""
pass
def start_training(self, logdir: str, **info):
def start_training(self, logdir: str, config: Dict, **info):
"""Called once on training start.
Args:
logdir (str): Path to the file directory where logs
should be persisted.
config (Dict): The config dict passed into ``trainer.run()``.
**info: kwargs dict for forward compatibility.
"""
pass

View file

@ -1,3 +1,4 @@
import os
from typing import Iterable, List, Optional, Dict, Set, Tuple, Union
import abc
import warnings
@ -11,13 +12,15 @@ from ray.util.ml_utils.dict import flatten_dict
from ray.util.ml_utils.json import SafeFallbackEncoder
from ray.train.callbacks import TrainingCallback
from ray.train.constants import (RESULT_FILE_JSON, TRAINING_ITERATION,
TIME_TOTAL_S, TIMESTAMP, PID)
TIME_TOTAL_S, TIMESTAMP, PID,
TRAIN_CHECKPOINT_SUBDIR)
from ray.util.ml_utils.mlflow import MLflowLoggerUtil
logger = logging.getLogger(__name__)
class TrainingLogdirMixin:
def start_training(self, logdir: str, **info):
def start_training(self, logdir: str):
if self._logdir:
logdir_path = Path(self._logdir)
else:
@ -83,8 +86,8 @@ class TrainingSingleFileLoggingCallback(
raise ValueError("filename cannot be None or empty.")
return logdir_path.joinpath(Path(filename))
def start_training(self, logdir: str, **info):
super().start_training(logdir, **info)
def start_training(self, logdir: str):
super().start_training(logdir)
if not self._filename:
filename = self._default_filename
@ -117,7 +120,7 @@ class JsonLoggerCallback(TrainingSingleFileLoggingCallback):
_default_filename: Union[str, Path] = RESULT_FILE_JSON
def start_training(self, logdir: str, **info):
super().start_training(logdir, **info)
super().start_training(logdir)
# Create a JSON file with an empty list
# that will be latter appended to
@ -174,6 +177,96 @@ class TrainingSingleWorkerLoggingCallback(
return worker_to_log
class MLflowLoggerCallback(TrainingSingleWorkerLoggingCallback):
"""MLflow Logger to automatically log Train results and config to MLflow.
MLflow (https://mlflow.org) Tracking is an open source library for
recording and querying experiments. This Ray Train callback
sends information (config parameters, training results & metrics,
and artifacts) to MLflow for automatic experiment tracking.
Args:
tracking_uri (Optional[str]): The tracking URI for where to manage
experiments and runs. This can either be a local file path or a
remote server. If None is passed in, the logdir of the trainer
will be used as the tracking URI.
This arg gets passed directly to mlflow initialization.
registry_uri (Optional[str]): The registry URI that gets passed
directly to mlflow initialization. If None is passed in, the
logdir of the trainer will be used as the registry URI.
experiment_id (Optional[str]): The experiment id of an already
existing experiment. If not
passed in, experiment_name will be used.
experiment_name (Optional[str]): The experiment name to use for this
Train run.
If the experiment with the name already exists with MLflow,
it will be used. If not, a new experiment will be created with
this name. At least one of ``experiment_id`` or
``experiment_name`` must be passed in.
tags (Optional[Dict]): An optional dictionary of string keys and
values to set as tags on the run
save_artifact (bool): If set to True, automatically save the entire
contents of the Train local_dir as an artifact to the
corresponding run in MlFlow.
logdir (Optional[str]): Path to directory where the results file
should be. If None, will be set by the Trainer. If no tracking
uri or registry uri are passed in, the logdir will be used for
both.
worker_to_log (int): Worker index to log. By default, will log the
worker with index 0.
"""
def __init__(self,
tracking_uri: Optional[str] = None,
registry_uri: Optional[str] = None,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
tags: Optional[Dict] = None,
save_artifact: bool = False,
logdir: Optional[str] = None,
worker_to_log: int = 0):
super().__init__(logdir=logdir, worker_to_log=worker_to_log)
self.tracking_uri = tracking_uri
self.registry_uri = registry_uri
self.experiment_id = experiment_id
self.experiment_name = experiment_name
self.tags = tags
self.save_artifact = save_artifact
self.mlflow_util = MLflowLoggerUtil()
def start_training(self, logdir: str, config: Dict, **info):
super().start_training(logdir=logdir)
tracking_uri = self.tracking_uri or os.path.join(
str(self.logdir), "mlruns")
registry_uri = self.registry_uri or os.path.join(
str(self.logdir), "mlruns")
self.mlflow_util.setup_mlflow(
tracking_uri=tracking_uri,
registry_uri=registry_uri,
experiment_id=self.experiment_id,
experiment_name=self.experiment_name,
create_experiment_if_not_exists=True)
self.mlflow_util.start_run(tags=self.tags, set_active=True)
self.mlflow_util.log_params(params_to_log=config)
def handle_result(self, results: List[Dict], **info):
result = results[self._workers_to_log]
self.mlflow_util.log_metrics(
metrics_to_log=result, step=result[TRAINING_ITERATION])
def finish_training(self, error: bool = False, **info):
checkpoint_dir = self.logdir.joinpath(TRAIN_CHECKPOINT_SUBDIR)
if self.save_artifact and checkpoint_dir.exists():
self.mlflow_util.save_artifacts(dir=str(checkpoint_dir))
self.mlflow_util.end_run(status="FAILED" if error else "FINISHED")
class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback):
"""Logs Train results in TensorboardX format.
@ -189,7 +282,7 @@ class TBXLoggerCallback(TrainingSingleWorkerLoggingCallback):
IGNORE_KEYS: Set[str] = {PID, TIMESTAMP, TIME_TOTAL_S, TRAINING_ITERATION}
def start_training(self, logdir: str, **info):
super().start_training(logdir, **info)
super().start_training(logdir)
try:
from tensorboardX import SummaryWriter

View file

@ -7,7 +7,8 @@ from pathlib import Path
from typing import List, Optional, Dict, Union, Callable
from ray import cloudpickle
from ray.train.constants import TIMESTAMP, TUNE_INSTALLED
from ray.train.constants import TIMESTAMP, TUNE_INSTALLED, \
TRAIN_CHECKPOINT_SUBDIR
from ray.train.constants import TUNE_CHECKPOINT_FILE_NAME, \
TUNE_CHECKPOINT_ID
from ray.train.session import TrainingResult
@ -256,7 +257,7 @@ class CheckpointManager:
@property
def latest_checkpoint_dir(self) -> Optional[Path]:
"""Path to the latest checkpoint directory."""
checkpoint_dir = Path("checkpoints")
checkpoint_dir = Path(TRAIN_CHECKPOINT_SUBDIR)
return construct_path(checkpoint_dir, self.run_dir)
@property

View file

@ -33,9 +33,12 @@ RESULT_FILE_JSON = "results.json"
# Default directory where all Train logs, checkpoints, etc. will be stored.
DEFAULT_RESULTS_DIR = Path("~/ray_results").expanduser()
# File name to use for checkpoints saved with Tune
# File name to use for checkpoints saved with Tune.
TUNE_CHECKPOINT_FILE_NAME = "checkpoint"
# The name of the subdirectory inside the trainer run_dir to store checkpoints.
TRAIN_CHECKPOINT_SUBDIR = "checkpoints"
# The key to use to specify the checkpoint id for Tune.
# This needs to be added to the checkpoint dictionary so if the Tune trial
# is restarted, the checkpoint_id can continue to increment.

View file

@ -1,30 +1,26 @@
import argparse
import mlflow
from ray.train import Trainer
from ray.train.examples.train_fashion_mnist_example import train_func
from ray.train.callbacks.logging import MLflowLoggerCallback
def main(num_workers=2, use_gpu=False):
mlflow.set_experiment("train_torch_fashion_mnist")
trainer = Trainer(
backend="torch", num_workers=num_workers, use_gpu=use_gpu)
trainer.start()
iterator = trainer.run_iterator(
final_results = trainer.run(
train_func=train_func,
config={
"lr": 1e-3,
"batch_size": 64,
"epochs": 4
})
},
callbacks=[
MLflowLoggerCallback(experiment_name="train_fashion_mnist")
])
for intermediate_result in iterator:
first_worker_result = intermediate_result[0]
mlflow.log_metric("loss", first_worker_result["loss"])
print("Full losses for rank 0 worker: ", iterator.get_final_results())
print("Full losses for rank 0 worker: ", final_results)
if __name__ == "__main__":

View file

@ -0,0 +1,27 @@
from ray import train
from ray.train import Trainer
from ray.train.callbacks import MLflowLoggerCallback, TBXLoggerCallback
def train_func():
for i in range(3):
train.report(epoch=i)
trainer = Trainer(backend="torch", num_workers=2)
trainer.start()
# Run the training function, logging all the intermediate results
# to MLflow and Tensorboard.
result = trainer.run(
train_func,
callbacks=[
MLflowLoggerCallback(experiment_name="train_experiment"),
TBXLoggerCallback()
])
# Print the latest run directory and keep note of it.
# For example: /home/ray_results/train_2021-09-01_12-00-00/run_001
print("Run directory:", trainer.latest_run_dir)
trainer.shutdown()

View file

@ -15,6 +15,7 @@ from ray.train.constants import (TRAINING_ITERATION, DETAILED_AUTOFILLED_KEYS,
from ray.train.callbacks import JsonLoggerCallback, TBXLoggerCallback
from ray.train.backend import BackendConfig, Backend
from ray.train.worker_group import WorkerGroup
from ray.train.callbacks.logging import MLflowLoggerCallback
try:
from tensorflow.python.summary.summary_iterator \
@ -136,8 +137,6 @@ def _validate_tbx_result(events_dir):
assert len(results["hello/world"]) == 1
@pytest.mark.skipif(
summary_iterator is None, reason="tensorboard is not installed")
def test_TBX(ray_start_4_cpus, make_temp_dir):
config = TestConfig()
@ -159,6 +158,54 @@ def test_TBX(ray_start_4_cpus, make_temp_dir):
_validate_tbx_result(temp_dir)
def test_mlflow(ray_start_4_cpus, make_temp_dir):
config = TestConfig()
params = {"p1": "p1"}
temp_dir = make_temp_dir
num_workers = 4
def train_func(config):
train.report(episode_reward_mean=4)
train.report(episode_reward_mean=5)
train.report(episode_reward_mean=6)
return 1
callback = MLflowLoggerCallback(
experiment_name="test_exp", logdir=temp_dir)
trainer = Trainer(config, num_workers=num_workers)
trainer.start()
trainer.run(train_func, config=params, callbacks=[callback])
from mlflow.tracking import MlflowClient
client = MlflowClient(
tracking_uri=callback.mlflow_util._mlflow.get_tracking_uri())
all_runs = callback.mlflow_util._mlflow.search_runs(experiment_ids=["0"])
assert len(all_runs) == 1
# all_runs is a pandas dataframe.
all_runs = all_runs.to_dict(orient="records")
run_id = all_runs[0]["run_id"]
run = client.get_run(run_id)
assert run.data.params == params
assert "episode_reward_mean" in run.data.metrics and \
run.data.metrics["episode_reward_mean"] == 6.0
assert TRAINING_ITERATION in run.data.metrics and \
run.data.metrics[TRAINING_ITERATION] == 3.0
metric_history = client.get_metric_history(
run_id=run_id, key="episode_reward_mean")
assert len(metric_history) == 3
iterations = [metric.step for metric in metric_history]
assert iterations == [1, 2, 3]
rewards = [metric.value for metric in metric_history]
assert rewards == [4, 5, 6]
if __name__ == "__main__":
import pytest
import sys

View file

@ -281,7 +281,8 @@ class Trainer:
finished_with_errors = False
for callback in callbacks:
callback.start_training(logdir=self.latest_run_dir)
callback.start_training(
logdir=str(self.latest_run_dir), config=config or {})
train_func = self._get_train_func(train_func, config)

View file

@ -1,4 +1,3 @@
import os
from typing import Dict, Callable, Optional
import logging
@ -7,18 +6,12 @@ from ray.tune.trainable import Trainable
from ray.tune.logger import Logger, LoggerCallback
from ray.tune.result import TRAINING_ITERATION, TIMESTEPS_TOTAL
from ray.tune.trial import Trial
from ray.util.annotations import Deprecated
from ray.util.ml_utils.mlflow import MLflowLoggerUtil
logger = logging.getLogger(__name__)
def _import_mlflow():
try:
import mlflow
except ImportError:
mlflow = None
return mlflow
class MLflowLoggerCallback(LoggerCallback):
"""MLflow Logger to automatically log Tune results and config to MLflow.
@ -30,16 +23,13 @@ class MLflowLoggerCallback(LoggerCallback):
Args:
tracking_uri (str): The tracking URI for where to manage experiments
and runs. This can either be a local file path or a remote server.
This arg gets passed directly to mlflow.tracking.MlflowClient
This arg gets passed directly to mlflow
initialization. When using Tune in a multi-node setting, make sure
to set this to a remote server and not a local file path.
registry_uri (str): The registry URI that gets passed directly to
mlflow.tracking.MlflowClient initialization.
mlflow initialization.
experiment_name (str): The experiment name to use for this Tune run.
If None is passed in here, the Logger will automatically then
check the MLFLOW_EXPERIMENT_NAME and then the MLFLOW_EXPERIMENT_ID
environment variables to determine the experiment name.
If the experiment with the name already exists with MlFlow,
If the experiment with the name already exists with MLflow,
it will be reused. If not, a new experiment will be created with
that name.
tags (Dict): An optional dictionary of string keys and values to set
@ -84,7 +74,9 @@ class MLflowLoggerCallback(LoggerCallback):
self.registry_uri = registry_uri
self.experiment_name = experiment_name
self.tags = tags
self.save_artifact = save_artifact
self.should_save_artifact = save_artifact
self.mlflow_util = MLflowLoggerUtil()
if ray.util.client.ray.is_connected():
logger.warning("When using MLflowLoggerCallback with Ray Client, "
@ -94,55 +86,17 @@ class MLflowLoggerCallback(LoggerCallback):
"setup on the server side and not on the client "
"side.")
def setup(self):
mlflow = _import_mlflow()
if mlflow is None:
raise RuntimeError("MLflow has not been installed. Please `pip "
"install mlflow` to use the MLflowLogger.")
from mlflow.tracking import MlflowClient
self.client = MlflowClient(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
if self.experiment_name is None:
# If no name is passed in, then check env vars.
# First check if experiment_name env var is set.
self.experiment_name = os.environ.get("MLFLOW_EXPERIMENT_NAME")
if self.experiment_name is not None:
# First check if experiment with name exists.
experiment = self.client.get_experiment_by_name(
self.experiment_name)
if experiment is not None:
# If it already exists then get the id.
experiment_id = experiment.experiment_id
else:
# If it does not exist, create the experiment.
experiment_id = self.client.create_experiment(
name=self.experiment_name)
else:
# No experiment_name is passed in and name env var is not set.
# Now check the experiment id env var.
experiment_id = os.environ.get("MLFLOW_EXPERIMENT_ID")
# Confirm that an experiment with this id exists.
if experiment_id is None or self.client.get_experiment(
experiment_id) is None:
raise ValueError("No experiment_name passed, "
"MLFLOW_EXPERIMENT_NAME env var is not "
"set, and MLFLOW_EXPERIMENT_ID either "
"is not set or does not exist. Please "
"set one of these to use the "
"MLflowLoggerCallback.")
def setup(self, *args, **kwargs):
# Setup the mlflow logging util.
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri,
experiment_name=self.experiment_name)
if self.tags is None:
# Create empty dictionary for tags if not given explicitly
self.tags = {}
# At this point, experiment_id should be set.
self.experiment_id = experiment_id
self.save_artifact = self.save_artifact
self._trial_runs = {}
def log_trial_start(self, trial: "Trial"):
@ -153,44 +107,34 @@ class MLflowLoggerCallback(LoggerCallback):
tags = self.tags.copy()
tags["trial_name"] = str(trial)
run = self.client.create_run(
experiment_id=self.experiment_id, tags=tags)
run = self.mlflow_util.start_run(tags=tags, run_name=str(trial))
self._trial_runs[trial] = run.info.run_id
run_id = self._trial_runs[trial]
# Log the config parameters.
config = trial.config
for key, value in config.items():
self.client.log_param(run_id=run_id, key=key, value=value)
self.mlflow_util.log_params(run_id=run_id, params_to_log=config)
def log_trial_result(self, iteration: int, trial: "Trial", result: Dict):
step = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION]
run_id = self._trial_runs[trial]
for key, value in result.items():
try:
value = float(value)
except (ValueError, TypeError):
logger.debug("Cannot log key {} with value {} since the "
"value cannot be converted to float.".format(
key, value))
continue
self.client.log_metric(
run_id=run_id, key=key, value=value, step=step)
self.mlflow_util.log_metrics(
run_id=run_id, metrics_to_log=result, step=step)
def log_trial_end(self, trial: "Trial", failed: bool = False):
run_id = self._trial_runs[trial]
# Log the artifact if set_artifact is set to True.
if self.save_artifact:
self.client.log_artifacts(run_id, local_dir=trial.logdir)
if self.should_save_artifact:
self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.logdir)
# Stop the run once trial finishes.
status = "FINISHED" if not failed else "FAILED"
self.client.set_terminated(run_id=run_id, status=status)
self.mlflow_util.end_run(run_id=run_id, status=status)
@Deprecated
class MLflowLogger(Logger):
"""MLflow logger using the deprecated Logger API.
@ -198,42 +142,10 @@ class MLflowLogger(Logger):
or manually set the proper environment variables.
"""
_experiment_logger_cls = MLflowLoggerCallback
def _init(self):
mlflow = _import_mlflow()
logger_config = self.config.pop("logger_config", {})
tracking_uri = logger_config.get("mlflow_tracking_uri")
registry_uri = logger_config.get("mlflow_registry_uri")
experiment_id = logger_config.get("mlflow_experiment_id")
if experiment_id is None or not mlflow.get_experiment(experiment_id):
raise ValueError(
"You must provide a valid `mlflow_experiment_id` "
"in your `logger_config` dict in the `config` "
"dict passed to `tune.run`. "
"Are you sure you passed in a `experiment_id` and "
"the experiment exists?")
else:
experiment_name = mlflow.get_experiment(experiment_id).name
self._trial_experiment_logger = self._experiment_logger_cls(
tracking_uri, registry_uri, experiment_name)
self._trial_experiment_logger.setup()
self._trial_experiment_logger.log_trial_start(self.trial)
def on_result(self, result: Dict):
self._trial_experiment_logger.log_trial_result(
iteration=result.get(TRAINING_ITERATION),
trial=self.trial,
result=result)
def close(self):
self._trial_experiment_logger.log_trial_end(
trial=self.trial, failed=False)
del self._trial_experiment_logger
raise DeprecationWarning("The legacy MLflowLogger has been "
"deprecated. Use the MLflowLoggerCallback "
"instead.")
def mlflow_mixin(func: Callable):
@ -330,9 +242,6 @@ def mlflow_mixin(func: Callable):
}
})
"""
if _import_mlflow() is None:
raise RuntimeError("MLflow has not been installed. Please `pip "
"install mlflow` to use the mlflow_mixin.")
if ray.util.client.ray.is_connected():
logger.warning("When using mlflow_mixin with Ray Client, "
"it is recommended to use a remote tracking "
@ -349,11 +258,7 @@ def mlflow_mixin(func: Callable):
class MLflowTrainableMixin:
def __init__(self, config: Dict, *args, **kwargs):
self._mlflow = _import_mlflow()
if self._mlflow is None:
raise RuntimeError("MLflow has not been installed. Please `pip "
"install mlflow` to use the mlflow_mixin.")
self.mlflow_util = MLflowLoggerUtil()
if not isinstance(self, Trainable):
raise ValueError(
@ -380,47 +285,29 @@ class MLflowTrainableMixin:
"passed in. Make sure to include a `mlflow` "
"key in your `config` dict containing at "
"least a `tracking_uri`")
self._mlflow.set_tracking_uri(tracking_uri)
# Set the tracking token if one is passed in.
tracking_token = mlflow_config.pop("token", None)
if tracking_token is not None:
os.environ["MLFLOW_TRACKING_TOKEN"] = tracking_token
# First see if experiment_id is passed in.
experiment_id = mlflow_config.pop("experiment_id", None)
if experiment_id is None or self._mlflow.get_experiment(
experiment_id) is None:
logger.debug("Either no experiment_id is passed in, or the "
"experiment with the given id does not exist. "
"Checking experiment_name")
# Check for name.
experiment_name = mlflow_config.pop("experiment_name", None)
if experiment_name is None:
raise ValueError(
"MLflow mixin specified but no "
"experiment_name or experiment_id has been "
"passed in. Make sure to include a `mlflow` "
"key in your `config` dict containing at "
"least a `experiment_name` or `experiment_id` "
"specification.")
experiment = self._mlflow.get_experiment_by_name(experiment_name)
if experiment is not None:
# Experiment with this name exists.
experiment_id = experiment.experiment_id
else:
raise ValueError("No experiment with the given "
"name: {} or id: {} currently exists. Make "
"sure to first start the MLflow experiment "
"before calling tune.run.".format(
experiment_name, experiment_id))
self.experiment_id = experiment_id
experiment_name = mlflow_config.pop("experiment_name", None)
# This initialization happens in each of the Trainables/workers.
# So we have to set `create_experiment_if_not_exists` to False.
# Otherwise there might be race conditions when each worker tries to
# create the same experiment.
# For the mixin, the experiment must be created beforehand.
self.mlflow_util.setup_mlflow(
tracking_uri=tracking_uri,
experiment_id=experiment_id,
experiment_name=experiment_name,
tracking_token=tracking_token,
create_experiment_if_not_exists=False)
run_name = self.trial_name + "_" + self.trial_id
run_name = run_name.replace("/", "_")
self._mlflow.start_run(
experiment_id=self.experiment_id, run_name=run_name)
self.mlflow_util.start_run(set_active=True, run_name=run_name)
def stop(self):
self._mlflow.end_run()
self.mlflow_util.end_run()

View file

@ -1,12 +1,16 @@
import os
import tempfile
import unittest
from collections import namedtuple
from unittest.mock import patch
from ray.tune.function_runner import wrap_function
from ray.tune.integration.mlflow import MLflowLoggerCallback, MLflowLogger, \
from ray.tune.integration.mlflow import MLflowLoggerCallback, \
mlflow_mixin, MLflowTrainableMixin
from mlflow.tracking import MlflowClient
from ray.util.ml_utils.mlflow import MLflowLoggerUtil
class MockTrial(
namedtuple("MockTrial",
@ -18,100 +22,10 @@ class MockTrial(
return self.trial_name
MockRunInfo = namedtuple("MockRunInfo", ["run_id"])
class MockRun:
def __init__(self, run_id, tags=None):
self.run_id = run_id
self.tags = tags
self.info = MockRunInfo(run_id)
self.params = []
self.metrics = []
self.artifacts = []
def log_param(self, key, value):
self.params.append({key: value})
def log_metric(self, key, value):
self.metrics.append({key: value})
def log_artifact(self, artifact):
self.artifacts.append(artifact)
def set_terminated(self, status):
self.terminated = True
self.status = status
MockExperiment = namedtuple("MockExperiment", ["name", "experiment_id"])
class MockMlflowClient:
def __init__(self, tracking_uri=None, registry_uri=None):
self.tracking_uri = tracking_uri
self.registry_uri = registry_uri
self.experiments = [MockExperiment("existing_experiment", 0)]
self.runs = {0: []}
self.active_run = None
def set_tracking_uri(self, tracking_uri):
self.tracking_uri = tracking_uri
def get_experiment_by_name(self, name):
try:
index = self.experiment_names.index(name)
return self.experiments[index]
except ValueError:
return None
def get_experiment(self, experiment_id):
experiment_id = int(experiment_id)
try:
return self.experiments[experiment_id]
except IndexError:
return None
def create_experiment(self, name):
experiment_id = len(self.experiments)
self.experiments.append(MockExperiment(name, experiment_id))
self.runs[experiment_id] = []
return experiment_id
def create_run(self, experiment_id, tags=None):
experiment_runs = self.runs[experiment_id]
run_id = (experiment_id, len(experiment_runs))
run = MockRun(run_id=run_id, tags=tags)
experiment_runs.append(run)
return run
def start_run(self, experiment_id, run_name):
# Creates new run and sets it as active.
run = self.create_run(experiment_id)
self.active_run = run
def get_mock_run(self, run_id):
return self.runs[run_id[0]][run_id[1]]
def log_param(self, run_id, key, value):
run = self.get_mock_run(run_id)
run.log_param(key, value)
def log_metric(self, run_id, key, value, step):
run = self.get_mock_run(run_id)
run.log_metric(key, value)
def log_artifacts(self, run_id, local_dir):
run = self.get_mock_run(run_id)
run.log_artifact(local_dir)
def set_terminated(self, run_id, status):
run = self.get_mock_run(run_id)
run.set_terminated(status)
@property
def experiment_names(self):
return [e.name for e in self.experiments]
class MockMLflowLoggerUtil(MLflowLoggerUtil):
def save_artifacts(self, dir, run_id):
self.artifact_saved = True
self.artifact_info = {"dir": dir, "run_id": run_id}
def clear_env_vars():
@ -122,87 +36,104 @@ def clear_env_vars():
class MLflowTest(unittest.TestCase):
@patch("mlflow.tracking.MlflowClient", MockMlflowClient)
def setUp(self):
self.tracking_uri = tempfile.mkdtemp()
self.registry_uri = tempfile.mkdtemp()
client = MlflowClient(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
client.create_experiment(name="existing_experiment")
assert client.get_experiment_by_name(
"existing_experiment").experiment_id == "0"
def testMlFlowLoggerCallbackConfig(self):
# Explicitly pass in all args.
logger = MLflowLoggerCallback(
tracking_uri="test1",
registry_uri="test2",
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri,
experiment_name="test_exp")
logger.setup()
self.assertEqual(logger.client.tracking_uri, "test1")
self.assertEqual(logger.client.registry_uri, "test2")
self.assertListEqual(logger.client.experiment_names,
["existing_experiment", "test_exp"])
self.assertEqual(logger.experiment_id, 1)
self.assertEqual(logger.mlflow_util._mlflow.get_tracking_uri(),
self.tracking_uri)
self.assertEqual(logger.mlflow_util._mlflow.get_registry_uri(),
self.registry_uri)
self.assertListEqual(
[e.name for e in logger.mlflow_util._mlflow.list_experiments()],
["existing_experiment", "test_exp"])
self.assertEqual(logger.mlflow_util.experiment_id, "1")
# Check if client recognizes already existing experiment.
logger = MLflowLoggerCallback(experiment_name="existing_experiment")
logger = MLflowLoggerCallback(
experiment_name="existing_experiment",
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri)
logger.setup()
self.assertListEqual(logger.client.experiment_names,
["existing_experiment"])
self.assertEqual(logger.experiment_id, 0)
self.assertEqual(logger.mlflow_util.experiment_id, "0")
# Pass in experiment name as env var.
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_exp"
logger = MLflowLoggerCallback()
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
logger.setup()
self.assertListEqual(logger.client.experiment_names,
["existing_experiment", "test_exp"])
self.assertEqual(logger.experiment_id, 1)
self.assertEqual(logger.mlflow_util.experiment_id, "1")
# Pass in existing experiment name as env var.
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_NAME"] = "existing_experiment"
logger = MLflowLoggerCallback()
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
logger.setup()
self.assertListEqual(logger.client.experiment_names,
["existing_experiment"])
self.assertEqual(logger.experiment_id, 0)
self.assertEqual(logger.mlflow_util.experiment_id, "0")
# Pass in existing experiment id as env var.
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_ID"] = "0"
logger = MLflowLoggerCallback()
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
logger.setup()
self.assertListEqual(logger.client.experiment_names,
["existing_experiment"])
self.assertEqual(logger.experiment_id, "0")
self.assertEqual(logger.mlflow_util.experiment_id, "0")
# Pass in non existing experiment id as env var.
# This should create a new experiment.
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_ID"] = "500"
with self.assertRaises(ValueError):
logger = MLflowLoggerCallback()
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
logger.setup()
# Experiment name env var should take precedence over id env var.
# Experiment id env var should take precedence over name env var.
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_exp"
os.environ["MLFLOW_EXPERIMENT_ID"] = "0"
logger = MLflowLoggerCallback()
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
logger.setup()
self.assertListEqual(logger.client.experiment_names,
["existing_experiment", "test_exp"])
self.assertEqual(logger.experiment_id, 1)
self.assertEqual(logger.mlflow_util.experiment_id, "0")
# Using tags
tags = {"user_name": "John", "git_commit_hash": "abc123"}
clear_env_vars()
os.environ["MLFLOW_EXPERIMENT_NAME"] = "test_tags"
os.environ["MLFLOW_EXPERIMENT_ID"] = "0"
logger = MLflowLoggerCallback(tags=tags)
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri,
tags=tags)
logger.setup()
self.assertEqual(logger.tags, tags)
@patch("mlflow.tracking.MlflowClient", MockMlflowClient)
@patch("ray.tune.integration.mlflow.MLflowLoggerUtil",
MockMLflowLoggerUtil)
def testMlFlowLoggerLogging(self):
clear_env_vars()
trial_config = {"par1": 4, "par2": 9.0}
trial_config = {"par1": "a", "par2": "b"}
trial = MockTrial(trial_config, "trial1", 0, "artifact")
logger = MLflowLoggerCallback(
tracking_uri=self.tracking_uri,
registry_uri=self.registry_uri,
experiment_name="test1",
save_artifact=True,
tags={"hello": "world"})
@ -210,20 +141,24 @@ class MLflowTest(unittest.TestCase):
# Check if run is created with proper tags.
logger.on_trial_start(iteration=0, trials=[], trial=trial)
# New run should be created for this trial with correct tag.
mock_run = logger.client.runs[1][0]
self.assertDictEqual(mock_run.tags, {
all_runs = logger.mlflow_util._mlflow.search_runs(experiment_ids=["1"])
self.assertEqual(len(all_runs), 1)
# all_runs is a pandas dataframe.
all_runs = all_runs.to_dict(orient="records")
run = logger.mlflow_util._mlflow.get_run(all_runs[0]["run_id"])
self.assertDictEqual(run.data.tags, {
"hello": "world",
"trial_name": "trial1"
"trial_name": "trial1",
"mlflow.runName": "trial1"
})
self.assertTupleEqual(mock_run.run_id, (1, 0))
self.assertTupleEqual(logger._trial_runs[trial], mock_run.run_id)
self.assertEqual(logger._trial_runs[trial], run.info.run_id)
# Params should be logged.
self.assertListEqual(mock_run.params, [{"par1": 4}, {"par2": 9}])
self.assertDictEqual(run.data.params, trial_config)
# When same trial is started again, new run should not be created.
logger.on_trial_start(iteration=0, trials=[], trial=trial)
self.assertEqual(len(logger.client.runs[1]), 1)
all_runs = logger.mlflow_util._mlflow.search_runs(experiment_ids=["1"])
self.assertEqual(len(all_runs), 1)
# Check metrics are logged properly.
result = {
@ -233,55 +168,22 @@ class MLflowTest(unittest.TestCase):
"training_iteration": 0
}
logger.on_trial_result(0, [], trial, result)
mock_run = logger.client.runs[1][0]
run = logger.mlflow_util._mlflow.get_run(run_id=run.info.run_id)
# metric3 is not logged since it cannot be converted to float.
self.assertListEqual(mock_run.metrics, [{
"metric1": 0.8
}, {
"metric2": 1.0
}, {
self.assertDictEqual(run.data.metrics, {
"metric1": 0.8,
"metric2": 1.0,
"training_iteration": 0
}])
})
# Check that artifact is logged on termination.
logger.on_trial_complete(0, [], trial)
mock_run = logger.client.runs[1][0]
self.assertListEqual(mock_run.artifacts, ["artifact"])
self.assertTrue(mock_run.terminated)
self.assertEqual(mock_run.status, "FINISHED")
self.assertTrue(logger.mlflow_util.artifact_saved)
self.assertDictEqual(logger.mlflow_util.artifact_info, {
"dir": "artifact",
"run_id": run.info.run_id
})
@patch("mlflow.tracking.MlflowClient", MockMlflowClient)
def testMlFlowLegacyLoggerConfig(self):
mlflow = MockMlflowClient()
with patch.dict("sys.modules", mlflow=mlflow):
clear_env_vars()
trial_config = {"par1": 4, "par2": 9.0}
trial = MockTrial(trial_config, "trial1", 0, "artifact")
# No experiment_id is passed in config, should raise an error.
with self.assertRaises(ValueError):
logger = MLflowLogger(trial_config, "/tmp", trial)
trial_config.update({
"logger_config": {
"mlflow_tracking_uri": "test_tracking_uri",
"mlflow_experiment_id": 0
}
})
trial = MockTrial(trial_config, "trial2", 1, "artifact")
logger = MLflowLogger(trial_config, "/tmp", trial)
experiment_logger = logger._trial_experiment_logger
client = experiment_logger.client
self.assertEqual(client.tracking_uri, "test_tracking_uri")
# Check to make sure that a run was created on experiment_id 0.
self.assertEqual(len(client.runs[0]), 1)
mock_run = client.runs[0][0]
self.assertDictEqual(mock_run.tags, {"trial_name": "trial2"})
self.assertListEqual(mock_run.params, [{"par1": 4}, {"par2": 9}])
@patch("ray.tune.integration.mlflow._import_mlflow",
lambda: MockMlflowClient())
def testMlFlowMixinConfig(self):
clear_env_vars()
trial_config = {"par1": 4, "par2": 9.0}
@ -294,40 +196,24 @@ class MLflowTest(unittest.TestCase):
# No MLflow config passed in.
with self.assertRaises(ValueError):
wrapped = wrap_function(train_fn)(trial_config)
wrap_function(train_fn)(trial_config)
trial_config.update({"mlflow": {}})
# No tracking uri or experiment_id/name passed in.
with self.assertRaises(ValueError):
wrapped = wrap_function(train_fn)(trial_config)
wrap_function(train_fn)(trial_config)
# Invalid experiment-id
trial_config["mlflow"].update({"experiment_id": "500"})
# No tracking uri or experiment_id/name passed in.
with self.assertRaises(ValueError):
wrapped = wrap_function(train_fn)(trial_config)
wrap_function(train_fn)(trial_config)
trial_config["mlflow"].update({
"tracking_uri": "test_tracking_uri",
"experiment_name": "existing_experiment"
})
wrapped = wrap_function(train_fn)(trial_config)
client = wrapped._mlflow
self.assertEqual(client.tracking_uri, "test_tracking_uri")
self.assertTupleEqual(client.active_run.run_id, (0, 0))
with patch("ray.tune.integration.mlflow._import_mlflow",
lambda: client):
train_fn.__mixins__ = (MLflowTrainableMixin, )
wrapped = wrap_function(train_fn)(trial_config)
client = wrapped._mlflow
self.assertTupleEqual(client.active_run.run_id, (0, 1))
# Set to experiment that does not already exist.
# New experiment should be created.
trial_config["mlflow"]["experiment_name"] = "new_experiment"
with self.assertRaises(ValueError):
wrapped = wrap_function(train_fn)(trial_config)
# Set to experiment that does not already exist.
# New experiment should be created.
trial_config["mlflow"]["experiment_name"] = "new_experiment"
with self.assertRaises(ValueError):
wrap_function(train_fn)(trial_config)
if __name__ == "__main__":

View file

@ -0,0 +1,18 @@
# --------------------------------------------------------------------
# Tests from the python/ray/util/ml_util/tests directory.
# Please keep these sorted alphabetically.
# --------------------------------------------------------------------
py_test(
name = "test_mlflow",
size = "medium",
srcs = ["tests/test_mlflow.py"],
tags = ["team:ml", "exclusive"],
deps = [":ml_util_lib"]
)
# This is a dummy test dependency that causes the above tests to be
# re-run if any of these files changes.
py_library(
name = "ml_util_lib",
srcs = glob(["**/*.py"], exclude=["tests/*.py"]),
)

View file

@ -0,0 +1,284 @@
import os
import logging
from typing import Dict, Optional, TYPE_CHECKING
if TYPE_CHECKING:
from mlflow.entities import Run
from mlflow.tracking import MlflowClient
logger = logging.getLogger(__name__)
class MLflowLoggerUtil:
"""Util class for setting up and logging to MLflow.
Use this util for any library that needs MLflow logging/tracking logic
such as Ray Tune or Ray Train.
"""
def __init__(self):
import mlflow
self._mlflow = mlflow
self.experiment_id = None
def setup_mlflow(
self,
tracking_uri: Optional[str] = None,
registry_uri: Optional[str] = None,
experiment_id: Optional[str] = None,
experiment_name: Optional[str] = None,
tracking_token=None,
create_experiment_if_not_exists: bool = True,
):
"""
Sets up MLflow.
Sets the Mlflow tracking uri & token, and registry URI. Also sets
the MLflow experiment that the logger should use, and possibly
creates new experiment if it does not exist.
Args:
tracking_uri (str): The tracking URI for the MLflow tracking
server.
registry_uri (str): The registry URI for the MLflow model registry.
experiment_id (str): The id of an already existing MLflow
experiment to use for logging. If None is passed in
here and the MFLOW_EXPERIMENT_ID is not set, or the
experiment with this id does not exist,
``experiment_name`` will be used instead. This argument takes
precedence over ``experiment_name`` if both are passed in.
experiment_name (str): The experiment name to use for logging.
If None is passed in here, the
the MLFLOW_EXPERIMENT_NAME environment variables is used to
determine the experiment name.
If the experiment with the name already exists with MLflow,
it will be reused. If not, a new experiment will be created
with the provided name if
``create_experiment_if_not_exists`` is set to True.
create_experiment_if_not_exists (bool): Whether to create an
experiment with the provided name if it does not already
exist. Defaults to True.
Returns:
Whether setup is successful.
"""
if tracking_token:
os.environ["MLFLOW_TRACKING_TOKEN"] = tracking_token
self._mlflow.set_tracking_uri(tracking_uri)
self._mlflow.set_registry_uri(registry_uri)
# First check experiment_id.
experiment_id = experiment_id if experiment_id is not None else \
os.environ.get("MLFLOW_EXPERIMENT_ID")
if experiment_id is not None:
from mlflow.exceptions import MlflowException
try:
self._mlflow.get_experiment(experiment_id=experiment_id)
logger.debug(f"Experiment with provided id {experiment_id} "
"exists. Setting that as the experiment.")
self.experiment_id = experiment_id
return
except MlflowException:
pass
# Then check experiment_name.
experiment_name = experiment_name if experiment_name is not None else \
os.environ.get("MLFLOW_EXPERIMENT_NAME")
if experiment_name is not None and self._mlflow.get_experiment_by_name(
name=experiment_name):
logger.debug(f"Experiment with provided name {experiment_name} "
"exists. Setting that as the experiment.")
self.experiment_id = self._mlflow.get_experiment_by_name(
experiment_name).experiment_id
return
# An experiment with the provided id or name does not exist.
# Create a new experiment if applicable.
if experiment_name and create_experiment_if_not_exists:
logger.debug("Existing experiment not found. Creating new "
f"experiment with name: {experiment_name}")
self.experiment_id = self._mlflow.create_experiment(
name=experiment_name)
return
if create_experiment_if_not_exists:
raise ValueError(f"Experiment with the provided experiment_id: "
f"{experiment_id} does not exist and no "
f"experiment_name provided. At least one of "
f"these has to be provided.")
else:
raise ValueError(f"Experiment with the provided experiment_id: "
f"{experiment_id} or experiment_name: "
f"{experiment_name} does not exist. Please "
f"create an MLflow experiment and provide "
f"either its id or name.")
def _parse_dict(self, dict_to_log: Dict) -> Dict:
"""Parses provided dict to convert all values to float.
MLflow can only log metrics that are floats. This does not apply to
logging parameters or artifacts.
Args:
dict_to_log (Dict): The dictionary containing the metrics to log.
Returns:
A dictionary containing the metrics to log with all values being
converted to floats, or skipped if not able to be converted.
"""
new_dict = {}
for key, value in dict_to_log.items():
try:
value = float(value)
new_dict[key] = value
except (ValueError, TypeError):
logger.debug("Cannot log key {} with value {} since the "
"value cannot be converted to float.".format(
key, value))
continue
return new_dict
def start_run(self,
run_name: Optional[str] = None,
tags: Optional[Dict] = None,
set_active: bool = False) -> "Run":
"""Starts a new run and possibly sets it as the active run.
Args:
tags (Optional[Dict]): Tags to set for the new run.
set_active (bool): Whether to set the new run as the active run.
If an active run already exists, then that run is returned.
Returns:
The newly created MLflow run.
"""
if set_active:
return self._start_active_run(run_name=run_name, tags=tags)
from mlflow.utils.mlflow_tags import MLFLOW_RUN_NAME
client = self._get_client()
tags[MLFLOW_RUN_NAME] = run_name
run = client.create_run(experiment_id=self.experiment_id, tags=tags)
return run
def _start_active_run(self,
run_name: Optional[str] = None,
tags: Optional[Dict] = None) -> "Run":
"""Starts a run and sets it as the active run if one does not exist.
If an active run already exists, then returns it.
"""
active_run = self._mlflow.active_run()
if active_run:
return active_run
return self._mlflow.start_run(run_name=run_name, tags=tags)
def _run_exists(self, run_id: str) -> bool:
"""Check if run with the provided id exists."""
from mlflow.exceptions import MlflowException
try:
self._mlflow.get_run(run_id=run_id)
return True
except MlflowException:
return False
def _get_client(self) -> "MlflowClient":
"""Returns an ml.tracking.MlflowClient instance to use for logging."""
tracking_uri = self._mlflow.get_tracking_uri()
registry_uri = self._mlflow.get_registry_uri()
from mlflow.tracking import MlflowClient
return MlflowClient(
tracking_uri=tracking_uri, registry_uri=registry_uri)
def log_params(self, params_to_log: Dict, run_id: Optional[str] = None):
"""Logs the provided parameters to the run specified by run_id.
If no ``run_id`` is passed in, then logs to the current active run.
If there is not active run, then creates a new run and sets it as
the active run.
Args:
params_to_log (Dict): Dictionary of parameters to log.
run_id (Optional[str]): The ID of the run to log to.
"""
if run_id and self._run_exists(run_id):
client = self._get_client()
for key, value in params_to_log.items():
client.log_param(run_id=run_id, key=key, value=value)
else:
for key, value in params_to_log.items():
self._mlflow.log_param(key=key, value=value)
def log_metrics(self,
step,
metrics_to_log: Dict,
run_id: Optional[str] = None):
"""Logs the provided metrics to the run specified by run_id.
If no ``run_id`` is passed in, then logs to the current active run.
If there is not active run, then creates a new run and sets it as
the active run.
Args:
metrics_to_log (Dict): Dictionary of metrics to log.
run_id (Optional[str]): The ID of the run to log to.
"""
metrics_to_log = self._parse_dict(metrics_to_log)
if run_id and self._run_exists(run_id):
client = self._get_client()
for key, value in metrics_to_log.items():
client.log_metric(
run_id=run_id, key=key, value=value, step=step)
else:
for key, value in metrics_to_log.items():
self._mlflow.log_metric(key=key, value=value, step=step)
def save_artifacts(self, dir: str, run_id: Optional[str] = None):
"""Saves directory as artifact to the run specified by run_id.
If no ``run_id`` is passed in, then saves to the current active run.
If there is not active run, then creates a new run and sets it as
the active run.
Args:
dir (str): Path to directory containing the files to save.
run_id (Optional[str]): The ID of the run to log to.
"""
if run_id and self._run_exists(run_id):
client = self._get_client()
client.log_artifacts(run_id=run_id, local_dir=dir)
else:
self._mlflow.log_artifacts(local_dir=dir)
def end_run(self, status: Optional[str] = None, run_id=None):
"""Terminates the run specified by run_id.
If no ``run_id`` is passed in, then terminates the
active run if one exists.
Args:
status (Optional[str]): The status to set when terminating the run.
run_id (Optional[str]): The ID of the run to terminate.
"""
if run_id and self._run_exists(run_id) and not (
self._mlflow.active_run()
and self._mlflow.active_run().info.run_id == run_id):
client = self._get_client()
client.set_terminated(run_id=run_id, status=status)
else:
self._mlflow.end_run(status=status)

View file

@ -0,0 +1,112 @@
import os
import shutil
import tempfile
import unittest
from ray.util.ml_utils.mlflow import MLflowLoggerUtil
class MLflowTest(unittest.TestCase):
def setUp(self):
self.dirpath = tempfile.mkdtemp()
import mlflow
mlflow.set_tracking_uri(self.dirpath)
mlflow.create_experiment(name="existing_experiment")
self.mlflow_util = MLflowLoggerUtil()
self.tracking_uri = mlflow.get_tracking_uri()
def tearDown(self):
shutil.rmtree(self.dirpath)
def test_experiment_id(self):
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri, experiment_id="0")
assert self.mlflow_util.experiment_id == "0"
def test_experiment_id_env_var(self):
os.environ["MLFLOW_EXPERIMENT_ID"] = "0"
self.mlflow_util.setup_mlflow(tracking_uri=self.tracking_uri)
assert self.mlflow_util.experiment_id == "0"
del os.environ["MLFLOW_EXPERIMENT_ID"]
def test_experiment_name(self):
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri,
experiment_name="existing_experiment")
assert self.mlflow_util.experiment_id == "0"
def test_experiment_name_env_var(self):
os.environ["MLFLOW_EXPERIMENT_NAME"] = "existing_experiment"
self.mlflow_util.setup_mlflow(tracking_uri=self.tracking_uri)
assert self.mlflow_util.experiment_id == "0"
del os.environ["MLFLOW_EXPERIMENT_NAME"]
def test_id_precedence(self):
os.environ["MLFLOW_EXPERIMENT_ID"] = "0"
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri, experiment_name="new_experiment")
assert self.mlflow_util.experiment_id == "0"
del os.environ["MLFLOW_EXPERIMENT_ID"]
def test_new_experiment(self):
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri, experiment_name="new_experiment")
assert self.mlflow_util.experiment_id == "1"
def test_setup_fail(self):
with self.assertRaises(ValueError):
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri,
experiment_name="new_experiment2",
create_experiment_if_not_exists=False)
def test_log_params(self):
params = {"a": "a"}
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri, experiment_name="new_experiment")
run = self.mlflow_util.start_run()
run_id = run.info.run_id
self.mlflow_util.log_params(params_to_log=params, run_id=run_id)
run = self.mlflow_util._mlflow.get_run(run_id=run_id)
assert run.data.params == params
params2 = {"b": "b"}
self.mlflow_util.start_run(set_active=True)
self.mlflow_util.log_params(params_to_log=params2, run_id=run_id)
assert self.mlflow_util._mlflow.get_run(run_id=run_id).data.params == {
**params,
**params2
}
self.mlflow_util.end_run()
def test_log_metrics(self):
metrics = {"a": 1.0}
self.mlflow_util.setup_mlflow(
tracking_uri=self.tracking_uri, experiment_name="new_experiment")
run = self.mlflow_util.start_run()
run_id = run.info.run_id
self.mlflow_util.log_metrics(
metrics_to_log=metrics, run_id=run_id, step=0)
run = self.mlflow_util._mlflow.get_run(run_id=run_id)
assert run.data.metrics == metrics
metrics2 = {"b": 1.0}
self.mlflow_util.start_run(set_active=True)
self.mlflow_util.log_metrics(
metrics_to_log=metrics2, run_id=run_id, step=0)
assert self.mlflow_util._mlflow.get_run(
run_id=run_id).data.metrics == {
**metrics,
**metrics2
}
self.mlflow_util.end_run()
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -1,6 +1,7 @@
-r requirements_dl.txt
mlflow==1.21.0
tensorboardX==2.4.1
# Dependencies for Hugging Face examples:
# `python/ray/train/examples/transformers/transformers_example.py`