Revert "[Core] Unified worker initiators (#17401)" (#17935)

This reverts commit c3764ffd7d.
This commit is contained in:
Eric Liang 2021-08-18 18:06:24 -07:00 committed by GitHub
parent 89d83228f6
commit a9073d16f4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 28 additions and 115 deletions

View file

@ -48,10 +48,6 @@ ABSL_FLAG(std::string, ray_logs_dir, "", "Logs dir for workers.");
ABSL_FLAG(std::string, ray_node_ip_address, "", "The ip address for this node.");
/// flag serialized_runtime_env is added in setup_runtime_env.py.
ABSL_FLAG(std::string, serialized_runtime_env, "{}",
"The serialized parsed runtime env dict.");
namespace ray {
namespace internal {

View file

@ -13,10 +13,6 @@ public class WorkerJvmOptionsTest extends BaseTest {
String getOptions() {
return System.getProperty("test.suffix");
}
String getEnv(String key) {
return System.getProperty(key);
}
}
@Test(groups = {"cluster"})
@ -29,8 +25,5 @@ public class WorkerJvmOptionsTest extends BaseTest {
.remote();
ObjectRef<String> obj = actor.task(Echo::getOptions).remote();
Assert.assertEquals(obj.get(), "suffix");
// Auto injected by setup_runtime_env.py
ObjectRef<String> env = actor.task(Echo::getEnv, "serialized-runtime-env").remote();
Assert.assertEquals(env.get(), "{}");
}
}

View file

@ -1451,8 +1451,6 @@ def start_raylet(redis_address,
include_java = has_java_command and ray_java_installed
if include_java is True:
java_worker_command = build_java_worker_command(
setup_worker_path,
worker_setup_hook,
redis_address,
plasma_store_name,
raylet_name,
@ -1465,9 +1463,8 @@ def start_raylet(redis_address,
if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
cpp_worker_command = build_cpp_worker_command(
"", setup_worker_path, worker_setup_hook, redis_address,
plasma_store_name, raylet_name, redis_password, session_dir,
log_dir, node_ip_address)
"", redis_address, plasma_store_name, raylet_name, redis_password,
session_dir, log_dir, node_ip_address)
else:
cpp_worker_command = []
@ -1475,14 +1472,11 @@ def start_raylet(redis_address,
# TODO(architkulkarni): Pipe in setup worker args separately instead of
# inserting them into start_worker_command and later erasing them if
# needed.
python_executable = sys.executable
start_worker_command = [
python_executable,
sys.executable,
setup_worker_path,
f"--worker-setup-hook={worker_setup_hook}",
f"--session-dir={session_dir}",
f"--worker-entrypoint={python_executable}",
"--worker-language=python",
worker_path,
f"--node-ip-address={node_ip_address}",
"--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
@ -1599,8 +1593,6 @@ def get_ray_jars_dir():
def build_java_worker_command(
setup_worker_path,
worker_setup_hook,
redis_address,
plasma_store_name,
raylet_name,
@ -1642,12 +1634,7 @@ def build_java_worker_command(
pairs.append(("ray.home", RAY_HOME))
pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
pairs.append(("ray.session-dir", session_dir))
command = [sys.executable, setup_worker_path]
command += [f"--worker-setup-hook={worker_setup_hook}"]
command += [f"--session-dir={session_dir}"]
command += ["--worker-entrypoint=java"]
command += ["--worker-language=java"]
command += ["-D{}={}".format(*pair) for pair in pairs]
command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs]
# Add ray jars path to java classpath
ray_jars = os.path.join(get_ray_jars_dir(), "*")
@ -1659,8 +1646,7 @@ def build_java_worker_command(
return command
def build_cpp_worker_command(cpp_worker_options, setup_worker_path,
worker_setup_hook, redis_address,
def build_cpp_worker_command(cpp_worker_options, redis_address,
plasma_store_name, raylet_name, redis_password,
session_dir, log_dir, node_ip_address):
"""This method assembles the command used to start a CPP worker.
@ -1680,12 +1666,7 @@ def build_cpp_worker_command(cpp_worker_options, setup_worker_path,
"""
command = [
sys.executable,
setup_worker_path,
f"--worker-setup-hook={worker_setup_hook}",
f"--session-dir={session_dir}",
f"--worker-entrypoint={DEFAULT_WORKER_EXECUTABLE}",
"--worker-language=cpp",
DEFAULT_WORKER_EXECUTABLE,
f"--ray_plasma_store_socket_name={plasma_store_name}",
f"--ray_raylet_socket_name={raylet_name}",
"--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
@ -1935,12 +1916,10 @@ def start_ray_client_server(redis_address,
conda_shim_flag = (
"--worker-setup-hook=" + ray_constants.DEFAULT_WORKER_SETUP_HOOK)
python_executable = sys.executable
command = [
python_executable,
sys.executable,
setup_worker_path,
conda_shim_flag, # These two args are to use the shim process.
f"--worker-entrypoint={python_executable}",
"-m",
"ray.util.client.server",
"--redis-address=" + str(redis_address),

View file

@ -20,6 +20,7 @@ py_test_module_list(
"tests/test_variable_mutable.py",
"tests/test_large_intermediate.py",
"tests/test_signature_check.py",
"tests/test_basic_workflows_2.py",
],
size = "small",
extra_srcs = SRCS,
@ -30,7 +31,6 @@ py_test_module_list(
py_test_module_list(
files = [
"tests/test_basic_workflows.py",
"tests/test_basic_workflows_2.py",
"tests/test_recovery.py",
"tests/test_lifetime.py",
"tests/test_workflow_manager.py",

View file

@ -30,22 +30,10 @@ parser.add_argument(
parser.add_argument(
"--session-dir", type=str, help="the directory for the current session")
parser.add_argument(
"--worker-entrypoint",
type=str,
help="the worker entrypoint: python,java etc. ")
parser.add_argument(
"--worker-language",
type=str,
help="the worker entrypoint: python,java,cpp etc.")
args, remaining_args = parser.parse_known_args()
# add worker-shim-pid argument
remaining_args.append("--worker-shim-pid={}".format(os.getpid()))
env_vars = {"worker-shim-pid": str(os.getpid())}
os.environ.update(env_vars)
py_executable: str = sys.executable
command_str = " ".join([f"exec {py_executable}"] + remaining_args)
child_pid = os.fork()

View file

@ -40,30 +40,5 @@ def test_fork_process_in_runtime_env(ray_start_cluster):
assert result == 1
@pytest.mark.skipif(
sys.platform == "win32", reason="Fork API is not supported on Windows")
def test_unified_fork_process(ray_start_cluster):
cluster = ray_start_cluster
directory = os.path.dirname(os.path.realpath(__file__))
# The job runs without runtime env, the raylet still use setup_worker
# to start worker.
setup_worker_path = os.path.join(directory, "mock_setup_worker.py")
cluster.add_node(num_cpus=1, setup_worker_path=setup_worker_path)
ray.init(address=cluster.address)
@ray.remote
class Actor():
def __init__(self):
pass
def get_shim_pid(self):
return os.environ["worker-shim-pid"]
a1 = Actor.options().remote()
obj_ref1 = a1.get_shim_pid.remote()
result = ray.get(obj_ref1)
assert result
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -67,7 +67,7 @@ py_test(
py_test(
name = "test_convergence",
size = "large",
size = "medium",
srcs = ["tests/test_convergence.py"],
deps = [":tune_lib"],
tags = ["exclusive"],

View file

@ -25,16 +25,6 @@ from ray.workers.pluggable_runtime_env import (RuntimeEnvContext,
logger = logging.getLogger(__name__)
parser = argparse.ArgumentParser()
parser.add_argument(
"--worker-entrypoint",
type=str,
help="the worker entrypoint: python,java etc. ")
parser.add_argument(
"--worker-language",
type=str,
help="the worker entrypoint: python,java,cpp etc.")
parser.add_argument(
"--serialized-runtime-env",
type=str,
@ -149,8 +139,7 @@ def setup_worker(input_args):
args, remaining_args = parser.parse_known_args(args=input_args)
commands = []
worker_executable: str = args.worker_entrypoint
worker_language: str = args.worker_language
py_executable: str = sys.executable
runtime_env: dict = json.loads(args.serialized_runtime_env or "{}")
runtime_env_context: RuntimeEnvContext = None
@ -165,7 +154,7 @@ def setup_worker(input_args):
# activate conda
if runtime_env_context and runtime_env_context.conda_env_name:
worker_executable = "python"
py_executable = "python"
conda_activate_commands = get_conda_activate_commands(
runtime_env_context.conda_env_name)
if (conda_activate_commands):
@ -177,29 +166,14 @@ def setup_worker(input_args):
"the context %s.", args.serialized_runtime_env,
args.serialized_runtime_env_context)
worker_command = [f"exec {worker_executable}"]
if worker_language == "java":
# Java worker don't parse the command parameters, add option.
remaining_args.insert(
len(remaining_args) - 1, "-D{}={}".format(
"serialized-runtime-env", f"'{args.serialized_runtime_env}'"))
worker_command += remaining_args
elif worker_language == "cpp":
worker_command += remaining_args
# cpp worker flags must use underscore
worker_command += [
"--serialized_runtime_env", f"'{args.serialized_runtime_env}'"
]
else:
worker_command += remaining_args
# Pass the runtime for working_dir setup.
# We can't do it in shim process here because it requires
# connection to gcs.
worker_command += [
"--serialized-runtime-env", f"'{args.serialized_runtime_env}'"
]
commands += [" ".join(worker_command)]
commands += [
" ".join(
[f"exec {py_executable}"] + remaining_args +
# Pass the runtime for working_dir setup.
# We can't do it in shim process here because it requires
# connection to gcs.
["--serialized-runtime-env", f"'{args.serialized_runtime_env}'"])
]
command_separator = " && "
command_str = command_separator.join(commands)
@ -207,7 +181,6 @@ def setup_worker(input_args):
if runtime_env.get("env_vars"):
env_vars = runtime_env["env_vars"]
os.environ.update(env_vars)
os.execvp("bash", ["bash", "-c", command_str])

View file

@ -327,6 +327,15 @@ Process WorkerPool::StartWorkerProcess(
// Allocated_resource_json is only used in "shim process".
worker_command_args.push_back("--allocated-instances-serialized-json=" +
allocated_instances_serialized_json);
} else {
// The "shim process" setup worker is not needed, so do not run it.
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
if (worker_command_args.size() >= 4 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 4);
}
}
worker_command_args.push_back("--runtime-env-hash=" +