diff --git a/python/ray/_private/runtime_env/constants.py b/python/ray/_private/runtime_env/constants.py index 24fa9cea9..cd7a1ece6 100644 --- a/python/ray/_private/runtime_env/constants.py +++ b/python/ray/_private/runtime_env/constants.py @@ -3,3 +3,9 @@ RAY_JOB_CONFIG_JSON_ENV_VAR = "RAY_JOB_CONFIG_JSON_ENV_VAR" # The plugins which should be loaded when ray cluster starts. RAY_RUNTIME_ENV_PLUGINS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGINS" + +# The schema files or directories of plugins which should be loaded in workers. +RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGIN_SCHEMAS" + +# The file suffix of runtime env plugin schemas. +RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX = ".json" diff --git a/python/ray/_private/runtime_env/plugin.py b/python/ray/_private/runtime_env/plugin.py index 03aee9fb3..9bf63945a 100644 --- a/python/ray/_private/runtime_env/plugin.py +++ b/python/ray/_private/runtime_env/plugin.py @@ -92,7 +92,7 @@ class RuntimeEnvPlugin(ABC): class RuntimeEnvPluginManager: - """This mananger is used to load plugins in runtime env agent.""" + """This manager is used to load plugins in runtime env agent.""" def __init__(self): self.plugins = {} diff --git a/python/ray/_private/runtime_env/plugin_schema_manager.py b/python/ray/_private/runtime_env/plugin_schema_manager.py new file mode 100644 index 000000000..1ff52f0e2 --- /dev/null +++ b/python/ray/_private/runtime_env/plugin_schema_manager.py @@ -0,0 +1,88 @@ +import os +import jsonschema +import logging +from typing import List +import json +from ray._private.runtime_env.constants import ( + RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR, + RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX, +) + +logger = logging.getLogger(__name__) + + +class RuntimeEnvPluginSchemaManager: + """This manager is used to load plugin json schemas.""" + + default_schema_path = os.path.join(os.path.dirname(__file__), "schemas") + schemas = {} + loaded = False + + @classmethod + def _load_schemas(cls, schema_paths: List[str]): + for schema_path in schema_paths: + try: + schema = json.load(open(schema_path)) + except json.decoder.JSONDecodeError: + logger.error("Invalid runtime env schema %s, skip it.", schema_path) + if "title" not in schema: + logger.error( + "No valid title in runtime env schema %s, skip it.", schema_path + ) + continue + if schema["title"] in cls.schemas: + logger.error( + "The 'title' of runtime env schema %s conflicts with %s, skip it.", + schema_path, + cls.schemas[schema["title"]], + ) + continue + cls.schemas[schema["title"]] = schema + + @classmethod + def _load_default_schemas(cls): + schema_json_files = list() + for root, _, files in os.walk(cls.default_schema_path): + for f in files: + if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX): + schema_json_files.append(os.path.join(root, f)) + logger.info( + f"Loading the default runtime env schemas: {schema_json_files}." + ) + cls._load_schemas(schema_json_files) + + @classmethod + def _load_schemas_from_env_var(cls): + # The format of env var: + # "/path/to/env_1_schema.json,/path/to/env_2_schema.json,/path/to/schemas_dir/" + schema_paths = os.environ.get(RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR) + if schema_paths: + schema_json_files = list() + for path in schema_paths.split(","): + if path.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX): + schema_json_files.append(path) + elif os.path.isdir(path): + for root, _, files in os.walk(path): + for f in files: + if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX): + schema_json_files.append(os.path.join(root, f)) + logger.info( + f"Loading the runtime env schemas from env var: {schema_json_files}." + ) + cls._load_schemas(schema_json_files) + + @classmethod + def validate(cls, name, instance): + if not cls.loaded: + # Load the schemas lazily. + cls._load_default_schemas() + cls._load_schemas_from_env_var() + cls.loaded = True + # if no schema matches, skip the validation. + if name in cls.schemas: + jsonschema.validate(instance=instance, schema=cls.schemas[name]) + + @classmethod + def clear(cls): + cls.schemas.clear() + cls.loaded = False diff --git a/python/ray/_private/runtime_env/schemas/pip_schema.json b/python/ray/_private/runtime_env/schemas/pip_schema.json new file mode 100644 index 000000000..f47100df4 --- /dev/null +++ b/python/ray/_private/runtime_env/schemas/pip_schema.json @@ -0,0 +1,50 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/pip_schema.json", + "title": "pip", + "description": "A pip environment specification.", + "oneOf": [ + { + "type": "object", + "properties": { + "packages": { + "oneOf": [ + { + "type": "array", + "items": { + "type": "string" + }, + "description": "a list of pip packages" + }, + { + "type": "string", + "description": "the path to a pip `requirements.txt` file" + } + ] + }, + "pip_check": { + "type": "boolean", + "description": "whether to enable pip check at the end of pip install" + }, + "pip_version": { + "type": "string", + "description": "the version of pip" + } + }, + "required": [ + "packages" + ] + }, + { + "type": "string", + "description": "the path to a pip `requirements.txt` file" + }, + { + "type": "array", + "items": { + "type": "string" + }, + "description": "a list of pip requirements specifiers" + } + ] + } diff --git a/python/ray/_private/runtime_env/schemas/working_dir_schema.json b/python/ray/_private/runtime_env/schemas/working_dir_schema.json new file mode 100644 index 000000000..4ec8ef018 --- /dev/null +++ b/python/ray/_private/runtime_env/schemas/working_dir_schema.json @@ -0,0 +1,7 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json", + "title": "working_dir", + "type": "string", + "description": "Specifies the working directory for the Ray workers." +} diff --git a/python/ray/runtime_env.py b/python/ray/runtime_env.py index 46783df1d..b7a899750 100644 --- a/python/ray/runtime_env.py +++ b/python/ray/runtime_env.py @@ -10,6 +10,7 @@ import ray from ray._private.ray_constants import DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS from ray._private.runtime_env.conda import get_uri as get_conda_uri from ray._private.runtime_env.pip import get_uri as get_pip_uri +from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN from ray.core.generated.runtime_env_common_pb2 import RuntimeEnv as ProtoRuntimeEnv from ray.core.generated.runtime_env_common_pb2 import ( @@ -430,8 +431,8 @@ class RuntimeEnv(dict): return plugin_uris def __setitem__(self, key: str, value: Any) -> None: - # TODO(SongGuyang): Validate the schemas of plugins by json schema. res_value = value + RuntimeEnvPluginSchemaManager.validate(key, res_value) if key in RuntimeEnv.known_fields and key in OPTION_TO_VALIDATION_FN: res_value = OPTION_TO_VALIDATION_FN[key](value) if res_value is None: diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 03c8c93a1..cdd75bd05 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -21,6 +21,7 @@ import ray import ray._private.ray_constants as ray_constants import ray.util.client.server.server as ray_client_server from ray._private.runtime_env.pip import PipProcessor +from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager from ray._private.services import ( REDIS_EXECUTABLE, _start_redis_instance, @@ -933,3 +934,15 @@ def set_runtime_env_plugins(request): yield runtime_env_plugins finally: del os.environ["RAY_RUNTIME_ENV_PLUGINS"] + + +@pytest.fixture +def set_runtime_env_plugin_schemas(request): + runtime_env_plugin_schemas = getattr(request, "param", "0") + try: + os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = runtime_env_plugin_schemas + # Clear and reload schemas. + RuntimeEnvPluginSchemaManager.clear() + yield runtime_env_plugin_schemas + finally: + del os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] diff --git a/python/ray/tests/test_runtime_env_validation.py b/python/ray/tests/test_runtime_env_validation.py index 902800b0c..a8cdb67e0 100644 --- a/python/ray/tests/test_runtime_env_validation.py +++ b/python/ray/tests/test_runtime_env_validation.py @@ -5,6 +5,7 @@ import tempfile from pathlib import Path from ray import job_config import yaml +import jsonschema from ray._private.runtime_env.validation import ( parse_and_validate_excludes, @@ -14,6 +15,7 @@ from ray._private.runtime_env.validation import ( parse_and_validate_env_vars, parse_and_validate_py_modules, ) +from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager from ray.runtime_env import RuntimeEnv CONDA_DICT = {"dependencies": ["pip", {"pip": ["pip-install-test==0.5"]}]} @@ -298,6 +300,83 @@ class TestParseJobConfig: assert config.metadata == {} +schemas_dir = os.path.dirname(__file__) +test_env_1 = os.path.join( + os.path.dirname(__file__), "test_runtime_env_validation_1_schema.json" +) +test_env_2 = os.path.join( + os.path.dirname(__file__), "test_runtime_env_validation_2_schema.json" +) + + +@pytest.mark.parametrize( + "set_runtime_env_plugin_schemas", + [schemas_dir, f"{test_env_1},{test_env_2}"], + indirect=True, +) +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +class TestValidateByJsonSchema: + def test_validate_pip(self, set_runtime_env_plugin_schemas): + runtime_env = RuntimeEnv() + runtime_env.set("pip", {"packages": ["requests"], "pip_check": True}) + with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"): + runtime_env.set("pip", {"packages": ["requests"], "pip_check": "1"}) + runtime_env["pip"] = {"packages": ["requests"], "pip_check": True} + with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"): + runtime_env["pip"] = {"packages": ["requests"], "pip_check": "1"} + + def test_validate_working_dir(self, set_runtime_env_plugin_schemas): + runtime_env = RuntimeEnv() + runtime_env.set("working_dir", "https://abc/file.zip") + with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"): + runtime_env.set("working_dir", ["https://abc/file.zip"]) + runtime_env["working_dir"] = "https://abc/file.zip" + with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"): + runtime_env["working_dir"] = ["https://abc/file.zip"] + + def test_validate_test_env_1(self, set_runtime_env_plugin_schemas): + runtime_env = RuntimeEnv() + runtime_env.set("test_env_1", {"array": ["123"], "bool": True}) + with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"): + runtime_env.set("test_env_1", {"array": ["123"], "bool": "1"}) + + def test_validate_test_env_2(self, set_runtime_env_plugin_schemas): + runtime_env = RuntimeEnv() + runtime_env.set("test_env_2", "123") + with pytest.raises(jsonschema.exceptions.ValidationError, match="test_env_2"): + runtime_env.set("test_env_2", ["123"]) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") +class TestRuntimeEnvPluginSchemaManager: + def test(self): + RuntimeEnvPluginSchemaManager.clear() + # No schemas when starts. + assert len(RuntimeEnvPluginSchemaManager.schemas) == 0 + # When the `validate` is used first time, the schemas will be loaded lazily. + # The validation of pip is enabled. + with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"): + RuntimeEnvPluginSchemaManager.validate( + "pip", {"packages": ["requests"], "pip_check": "123"} + ) + # The validation of test_env_1 is disabled because we haven't set the env var. + RuntimeEnvPluginSchemaManager.validate( + "test_env_1", {"array": ["123"], "bool": "123"} + ) + assert len(RuntimeEnvPluginSchemaManager.schemas) != 0 + # Set the thirdparty schemas. + os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = schemas_dir + # clear the loaded schemas to make sure the schemas chould be reloaded next + # time. + RuntimeEnvPluginSchemaManager.clear() + assert len(RuntimeEnvPluginSchemaManager.schemas) == 0 + # The validation of test_env_1 is enabled. + with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"): + RuntimeEnvPluginSchemaManager.validate( + "test_env_1", {"array": ["123"], "bool": "123"} + ) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/tests/test_runtime_env_validation_1_schema.json b/python/ray/tests/test_runtime_env_validation_1_schema.json new file mode 100644 index 000000000..93f3362ea --- /dev/null +++ b/python/ray/tests/test_runtime_env_validation_1_schema.json @@ -0,0 +1,20 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/pip_schema.json", + "title": "test_env_1", + "type": "object", + "properties": { + "array": { + "type": "array", + "items": { + "type": "string" + } + }, + "bool": { + "type": "boolean" + } + }, + "required": [ + "array" + ] +} diff --git a/python/ray/tests/test_runtime_env_validation_2_schema.json b/python/ray/tests/test_runtime_env_validation_2_schema.json new file mode 100644 index 000000000..883de5665 --- /dev/null +++ b/python/ray/tests/test_runtime_env_validation_2_schema.json @@ -0,0 +1,6 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json", + "title": "test_env_2", + "type": "string" +} diff --git a/python/ray/tests/test_runtime_env_validation_bad_1_schema.json b/python/ray/tests/test_runtime_env_validation_bad_1_schema.json new file mode 100644 index 000000000..45b4c3068 --- /dev/null +++ b/python/ray/tests/test_runtime_env_validation_bad_1_schema.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json", + "type": "string" +} diff --git a/python/ray/tests/test_runtime_env_validation_bad_2_schema.json b/python/ray/tests/test_runtime_env_validation_bad_2_schema.json new file mode 100644 index 000000000..059b8ae9e --- /dev/null +++ b/python/ray/tests/test_runtime_env_validation_bad_2_schema.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json", + "type": "string" + diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index a78bcb477..9744d245c 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -37,7 +37,7 @@ namespace gcs { /// from being too large to review. /// /// 1). Remove `node_resource_usages_` related code as it could be calculated from -/// `cluseter_resource_mananger` +/// `cluster_resource_manager` /// 2). Move all resource-write-related logic out from `gcs_resource_manager` /// 3). Move `placement_group_load_` from `gcs_resource_manager` to /// `placement_group_manager` and make `gcs_resource_manager` depend on