Revert "[runtime env] plugin refactor[2/n]: support json schema validation (#26154)" (#26246)

This reverts commit 122ec5e52f.
This commit is contained in:
Guyang Song 2022-07-01 15:48:03 +08:00 committed by GitHub
parent c8a90e00ac
commit b9ade079cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 3 additions and 283 deletions

View file

@ -3,9 +3,3 @@ RAY_JOB_CONFIG_JSON_ENV_VAR = "RAY_JOB_CONFIG_JSON_ENV_VAR"
# The plugins which should be loaded when ray cluster starts.
RAY_RUNTIME_ENV_PLUGINS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGINS"
# The schema files or directories of plugins which should be loaded in workers.
RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR = "RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"
# The file suffix of runtime env plugin schemas.
RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX = ".json"

View file

@ -92,7 +92,7 @@ class RuntimeEnvPlugin(ABC):
class RuntimeEnvPluginManager:
"""This manager is used to load plugins in runtime env agent."""
"""This mananger is used to load plugins in runtime env agent."""
def __init__(self):
self.plugins = {}

View file

@ -1,88 +0,0 @@
import os
import jsonschema
import logging
from typing import List
import json
from ray._private.runtime_env.constants import (
RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR,
RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX,
)
logger = logging.getLogger(__name__)
class RuntimeEnvPluginSchemaManager:
"""This manager is used to load plugin json schemas."""
default_schema_path = os.path.join(os.path.dirname(__file__), "schemas")
schemas = {}
loaded = False
@classmethod
def _load_schemas(cls, schema_paths: List[str]):
for schema_path in schema_paths:
try:
schema = json.load(open(schema_path))
except json.decoder.JSONDecodeError:
logger.error("Invalid runtime env schema %s, skip it.", schema_path)
if "title" not in schema:
logger.error(
"No valid title in runtime env schema %s, skip it.", schema_path
)
continue
if schema["title"] in cls.schemas:
logger.error(
"The 'title' of runtime env schema %s conflicts with %s, skip it.",
schema_path,
cls.schemas[schema["title"]],
)
continue
cls.schemas[schema["title"]] = schema
@classmethod
def _load_default_schemas(cls):
schema_json_files = list()
for root, _, files in os.walk(cls.default_schema_path):
for f in files:
if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(os.path.join(root, f))
logger.info(
f"Loading the default runtime env schemas: {schema_json_files}."
)
cls._load_schemas(schema_json_files)
@classmethod
def _load_schemas_from_env_var(cls):
# The format of env var:
# "/path/to/env_1_schema.json,/path/to/env_2_schema.json,/path/to/schemas_dir/"
schema_paths = os.environ.get(RAY_RUNTIME_ENV_PLUGIN_SCHEMAS_ENV_VAR)
if schema_paths:
schema_json_files = list()
for path in schema_paths.split(","):
if path.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(path)
elif os.path.isdir(path):
for root, _, files in os.walk(path):
for f in files:
if f.endswith(RAY_RUNTIME_ENV_PLUGIN_SCHEMA_SUFFIX):
schema_json_files.append(os.path.join(root, f))
logger.info(
f"Loading the runtime env schemas from env var: {schema_json_files}."
)
cls._load_schemas(schema_json_files)
@classmethod
def validate(cls, name, instance):
if not cls.loaded:
# Load the schemas lazily.
cls._load_default_schemas()
cls._load_schemas_from_env_var()
cls.loaded = True
# if no schema matches, skip the validation.
if name in cls.schemas:
jsonschema.validate(instance=instance, schema=cls.schemas[name])
@classmethod
def clear(cls):
cls.schemas.clear()
cls.loaded = False

View file

@ -1,50 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/pip_schema.json",
"title": "pip",
"description": "A pip environment specification.",
"oneOf": [
{
"type": "object",
"properties": {
"packages": {
"oneOf": [
{
"type": "array",
"items": {
"type": "string"
},
"description": "a list of pip packages"
},
{
"type": "string",
"description": "the path to a pip `requirements.txt` file"
}
]
},
"pip_check": {
"type": "boolean",
"description": "whether to enable pip check at the end of pip install"
},
"pip_version": {
"type": "string",
"description": "the version of pip"
}
},
"required": [
"packages"
]
},
{
"type": "string",
"description": "the path to a pip `requirements.txt` file"
},
{
"type": "array",
"items": {
"type": "string"
},
"description": "a list of pip requirements specifiers"
}
]
}

View file

@ -1,7 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"title": "working_dir",
"type": "string",
"description": "Specifies the working directory for the Ray workers."
}

View file

@ -10,7 +10,6 @@ import ray
from ray._private.ray_constants import DEFAULT_RUNTIME_ENV_TIMEOUT_SECONDS
from ray._private.runtime_env.conda import get_uri as get_conda_uri
from ray._private.runtime_env.pip import get_uri as get_pip_uri
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.runtime_env.validation import OPTION_TO_VALIDATION_FN
from ray.core.generated.runtime_env_common_pb2 import RuntimeEnv as ProtoRuntimeEnv
from ray.core.generated.runtime_env_common_pb2 import (
@ -431,8 +430,8 @@ class RuntimeEnv(dict):
return plugin_uris
def __setitem__(self, key: str, value: Any) -> None:
# TODO(SongGuyang): Validate the schemas of plugins by json schema.
res_value = value
RuntimeEnvPluginSchemaManager.validate(key, res_value)
if key in RuntimeEnv.known_fields and key in OPTION_TO_VALIDATION_FN:
res_value = OPTION_TO_VALIDATION_FN[key](value)
if res_value is None:

View file

@ -21,7 +21,6 @@ import ray
import ray._private.ray_constants as ray_constants
import ray.util.client.server.server as ray_client_server
from ray._private.runtime_env.pip import PipProcessor
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray._private.services import (
REDIS_EXECUTABLE,
_start_redis_instance,
@ -934,15 +933,3 @@ def set_runtime_env_plugins(request):
yield runtime_env_plugins
finally:
del os.environ["RAY_RUNTIME_ENV_PLUGINS"]
@pytest.fixture
def set_runtime_env_plugin_schemas(request):
runtime_env_plugin_schemas = getattr(request, "param", "0")
try:
os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = runtime_env_plugin_schemas
# Clear and reload schemas.
RuntimeEnvPluginSchemaManager.clear()
yield runtime_env_plugin_schemas
finally:
del os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"]

View file

@ -5,7 +5,6 @@ import tempfile
from pathlib import Path
from ray import job_config
import yaml
import jsonschema
from ray._private.runtime_env.validation import (
parse_and_validate_excludes,
@ -15,7 +14,6 @@ from ray._private.runtime_env.validation import (
parse_and_validate_env_vars,
parse_and_validate_py_modules,
)
from ray._private.runtime_env.plugin_schema_manager import RuntimeEnvPluginSchemaManager
from ray.runtime_env import RuntimeEnv
CONDA_DICT = {"dependencies": ["pip", {"pip": ["pip-install-test==0.5"]}]}
@ -300,83 +298,6 @@ class TestParseJobConfig:
assert config.metadata == {}
schemas_dir = os.path.dirname(__file__)
test_env_1 = os.path.join(
os.path.dirname(__file__), "test_runtime_env_validation_1_schema.json"
)
test_env_2 = os.path.join(
os.path.dirname(__file__), "test_runtime_env_validation_2_schema.json"
)
@pytest.mark.parametrize(
"set_runtime_env_plugin_schemas",
[schemas_dir, f"{test_env_1},{test_env_2}"],
indirect=True,
)
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
class TestValidateByJsonSchema:
def test_validate_pip(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("pip", {"packages": ["requests"], "pip_check": True})
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
runtime_env.set("pip", {"packages": ["requests"], "pip_check": "1"})
runtime_env["pip"] = {"packages": ["requests"], "pip_check": True}
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
runtime_env["pip"] = {"packages": ["requests"], "pip_check": "1"}
def test_validate_working_dir(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("working_dir", "https://abc/file.zip")
with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"):
runtime_env.set("working_dir", ["https://abc/file.zip"])
runtime_env["working_dir"] = "https://abc/file.zip"
with pytest.raises(jsonschema.exceptions.ValidationError, match="working_dir"):
runtime_env["working_dir"] = ["https://abc/file.zip"]
def test_validate_test_env_1(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("test_env_1", {"array": ["123"], "bool": True})
with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"):
runtime_env.set("test_env_1", {"array": ["123"], "bool": "1"})
def test_validate_test_env_2(self, set_runtime_env_plugin_schemas):
runtime_env = RuntimeEnv()
runtime_env.set("test_env_2", "123")
with pytest.raises(jsonschema.exceptions.ValidationError, match="test_env_2"):
runtime_env.set("test_env_2", ["123"])
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
class TestRuntimeEnvPluginSchemaManager:
def test(self):
RuntimeEnvPluginSchemaManager.clear()
# No schemas when starts.
assert len(RuntimeEnvPluginSchemaManager.schemas) == 0
# When the `validate` is used first time, the schemas will be loaded lazily.
# The validation of pip is enabled.
with pytest.raises(jsonschema.exceptions.ValidationError, match="pip_check"):
RuntimeEnvPluginSchemaManager.validate(
"pip", {"packages": ["requests"], "pip_check": "123"}
)
# The validation of test_env_1 is disabled because we haven't set the env var.
RuntimeEnvPluginSchemaManager.validate(
"test_env_1", {"array": ["123"], "bool": "123"}
)
assert len(RuntimeEnvPluginSchemaManager.schemas) != 0
# Set the thirdparty schemas.
os.environ["RAY_RUNTIME_ENV_PLUGIN_SCHEMAS"] = schemas_dir
# clear the loaded schemas to make sure the schemas chould be reloaded next
# time.
RuntimeEnvPluginSchemaManager.clear()
assert len(RuntimeEnvPluginSchemaManager.schemas) == 0
# The validation of test_env_1 is enabled.
with pytest.raises(jsonschema.exceptions.ValidationError, match="bool"):
RuntimeEnvPluginSchemaManager.validate(
"test_env_1", {"array": ["123"], "bool": "123"}
)
if __name__ == "__main__":
if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))

View file

@ -1,20 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/pip_schema.json",
"title": "test_env_1",
"type": "object",
"properties": {
"array": {
"type": "array",
"items": {
"type": "string"
}
},
"bool": {
"type": "boolean"
}
},
"required": [
"array"
]
}

View file

@ -1,6 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"title": "test_env_2",
"type": "string"
}

View file

@ -1,5 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"type": "string"
}

View file

@ -1,5 +0,0 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://github.com/ray-project/ray/runtime_env/working_dir_schema.json",
"type": "string"

View file

@ -37,7 +37,7 @@ namespace gcs {
/// from being too large to review.
///
/// 1). Remove `node_resource_usages_` related code as it could be calculated from
/// `cluster_resource_manager`
/// `cluseter_resource_mananger`
/// 2). Move all resource-write-related logic out from `gcs_resource_manager`
/// 3). Move `placement_group_load_` from `gcs_resource_manager` to
/// `placement_group_manager` and make `gcs_resource_manager` depend on