From a9073d16f4060a20943f4ef0ca06bcd1ba4c69bd Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 18 Aug 2021 18:06:24 -0700 Subject: [PATCH] Revert "[Core] Unified worker initiators (#17401)" (#17935) This reverts commit c3764ffd7d04afd10b384872d50d0fbaaef23f54. --- cpp/src/ray/config_internal.cc | 4 -- .../io/ray/test/WorkerJvmOptionsTest.java | 7 --- python/ray/_private/services.py | 35 +++----------- python/ray/experimental/workflow/BUILD | 2 +- python/ray/tests/mock_setup_worker.py | 12 ----- .../tests/test_runtime_env_fork_process.py | 25 ---------- python/ray/tune/BUILD | 2 +- python/ray/workers/setup_runtime_env.py | 47 ++++--------------- src/ray/raylet/worker_pool.cc | 9 ++++ 9 files changed, 28 insertions(+), 115 deletions(-) diff --git a/cpp/src/ray/config_internal.cc b/cpp/src/ray/config_internal.cc index 63903a1b7..ce85dc5fc 100644 --- a/cpp/src/ray/config_internal.cc +++ b/cpp/src/ray/config_internal.cc @@ -48,10 +48,6 @@ ABSL_FLAG(std::string, ray_logs_dir, "", "Logs dir for workers."); ABSL_FLAG(std::string, ray_node_ip_address, "", "The ip address for this node."); -/// flag serialized_runtime_env is added in setup_runtime_env.py. -ABSL_FLAG(std::string, serialized_runtime_env, "{}", - "The serialized parsed runtime env dict."); - namespace ray { namespace internal { diff --git a/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java b/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java index 6c01dcd9b..27b070496 100644 --- a/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java +++ b/java/test/src/main/java/io/ray/test/WorkerJvmOptionsTest.java @@ -13,10 +13,6 @@ public class WorkerJvmOptionsTest extends BaseTest { String getOptions() { return System.getProperty("test.suffix"); } - - String getEnv(String key) { - return System.getProperty(key); - } } @Test(groups = {"cluster"}) @@ -29,8 +25,5 @@ public class WorkerJvmOptionsTest extends BaseTest { .remote(); ObjectRef obj = actor.task(Echo::getOptions).remote(); Assert.assertEquals(obj.get(), "suffix"); - // Auto injected by setup_runtime_env.py - ObjectRef env = actor.task(Echo::getEnv, "serialized-runtime-env").remote(); - Assert.assertEquals(env.get(), "{}"); } } diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 914d838e2..b3a0a85e2 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -1451,8 +1451,6 @@ def start_raylet(redis_address, include_java = has_java_command and ray_java_installed if include_java is True: java_worker_command = build_java_worker_command( - setup_worker_path, - worker_setup_hook, redis_address, plasma_store_name, raylet_name, @@ -1465,9 +1463,8 @@ def start_raylet(redis_address, if os.path.exists(DEFAULT_WORKER_EXECUTABLE): cpp_worker_command = build_cpp_worker_command( - "", setup_worker_path, worker_setup_hook, redis_address, - plasma_store_name, raylet_name, redis_password, session_dir, - log_dir, node_ip_address) + "", redis_address, plasma_store_name, raylet_name, redis_password, + session_dir, log_dir, node_ip_address) else: cpp_worker_command = [] @@ -1475,14 +1472,11 @@ def start_raylet(redis_address, # TODO(architkulkarni): Pipe in setup worker args separately instead of # inserting them into start_worker_command and later erasing them if # needed. - python_executable = sys.executable start_worker_command = [ - python_executable, + sys.executable, setup_worker_path, f"--worker-setup-hook={worker_setup_hook}", f"--session-dir={session_dir}", - f"--worker-entrypoint={python_executable}", - "--worker-language=python", worker_path, f"--node-ip-address={node_ip_address}", "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER", @@ -1599,8 +1593,6 @@ def get_ray_jars_dir(): def build_java_worker_command( - setup_worker_path, - worker_setup_hook, redis_address, plasma_store_name, raylet_name, @@ -1642,12 +1634,7 @@ def build_java_worker_command( pairs.append(("ray.home", RAY_HOME)) pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs"))) pairs.append(("ray.session-dir", session_dir)) - command = [sys.executable, setup_worker_path] - command += [f"--worker-setup-hook={worker_setup_hook}"] - command += [f"--session-dir={session_dir}"] - command += ["--worker-entrypoint=java"] - command += ["--worker-language=java"] - command += ["-D{}={}".format(*pair) for pair in pairs] + command = ["java"] + ["-D{}={}".format(*pair) for pair in pairs] # Add ray jars path to java classpath ray_jars = os.path.join(get_ray_jars_dir(), "*") @@ -1659,8 +1646,7 @@ def build_java_worker_command( return command -def build_cpp_worker_command(cpp_worker_options, setup_worker_path, - worker_setup_hook, redis_address, +def build_cpp_worker_command(cpp_worker_options, redis_address, plasma_store_name, raylet_name, redis_password, session_dir, log_dir, node_ip_address): """This method assembles the command used to start a CPP worker. @@ -1680,12 +1666,7 @@ def build_cpp_worker_command(cpp_worker_options, setup_worker_path, """ command = [ - sys.executable, - setup_worker_path, - f"--worker-setup-hook={worker_setup_hook}", - f"--session-dir={session_dir}", - f"--worker-entrypoint={DEFAULT_WORKER_EXECUTABLE}", - "--worker-language=cpp", + DEFAULT_WORKER_EXECUTABLE, f"--ray_plasma_store_socket_name={plasma_store_name}", f"--ray_raylet_socket_name={raylet_name}", "--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER", @@ -1935,12 +1916,10 @@ def start_ray_client_server(redis_address, conda_shim_flag = ( "--worker-setup-hook=" + ray_constants.DEFAULT_WORKER_SETUP_HOOK) - python_executable = sys.executable command = [ - python_executable, + sys.executable, setup_worker_path, conda_shim_flag, # These two args are to use the shim process. - f"--worker-entrypoint={python_executable}", "-m", "ray.util.client.server", "--redis-address=" + str(redis_address), diff --git a/python/ray/experimental/workflow/BUILD b/python/ray/experimental/workflow/BUILD index 697a24468..fce54f921 100644 --- a/python/ray/experimental/workflow/BUILD +++ b/python/ray/experimental/workflow/BUILD @@ -20,6 +20,7 @@ py_test_module_list( "tests/test_variable_mutable.py", "tests/test_large_intermediate.py", "tests/test_signature_check.py", + "tests/test_basic_workflows_2.py", ], size = "small", extra_srcs = SRCS, @@ -30,7 +31,6 @@ py_test_module_list( py_test_module_list( files = [ "tests/test_basic_workflows.py", - "tests/test_basic_workflows_2.py", "tests/test_recovery.py", "tests/test_lifetime.py", "tests/test_workflow_manager.py", diff --git a/python/ray/tests/mock_setup_worker.py b/python/ray/tests/mock_setup_worker.py index 1967b9a02..a19a9ce22 100644 --- a/python/ray/tests/mock_setup_worker.py +++ b/python/ray/tests/mock_setup_worker.py @@ -30,22 +30,10 @@ parser.add_argument( parser.add_argument( "--session-dir", type=str, help="the directory for the current session") -parser.add_argument( - "--worker-entrypoint", - type=str, - help="the worker entrypoint: python,java etc. ") - -parser.add_argument( - "--worker-language", - type=str, - help="the worker entrypoint: python,java,cpp etc.") - args, remaining_args = parser.parse_known_args() # add worker-shim-pid argument remaining_args.append("--worker-shim-pid={}".format(os.getpid())) -env_vars = {"worker-shim-pid": str(os.getpid())} -os.environ.update(env_vars) py_executable: str = sys.executable command_str = " ".join([f"exec {py_executable}"] + remaining_args) child_pid = os.fork() diff --git a/python/ray/tests/test_runtime_env_fork_process.py b/python/ray/tests/test_runtime_env_fork_process.py index 9b56d5b28..c231d50af 100644 --- a/python/ray/tests/test_runtime_env_fork_process.py +++ b/python/ray/tests/test_runtime_env_fork_process.py @@ -40,30 +40,5 @@ def test_fork_process_in_runtime_env(ray_start_cluster): assert result == 1 -@pytest.mark.skipif( - sys.platform == "win32", reason="Fork API is not supported on Windows") -def test_unified_fork_process(ray_start_cluster): - cluster = ray_start_cluster - directory = os.path.dirname(os.path.realpath(__file__)) - # The job runs without runtime env, the raylet still use setup_worker - # to start worker. - setup_worker_path = os.path.join(directory, "mock_setup_worker.py") - cluster.add_node(num_cpus=1, setup_worker_path=setup_worker_path) - ray.init(address=cluster.address) - - @ray.remote - class Actor(): - def __init__(self): - pass - - def get_shim_pid(self): - return os.environ["worker-shim-pid"] - - a1 = Actor.options().remote() - obj_ref1 = a1.get_shim_pid.remote() - result = ray.get(obj_ref1) - assert result - - if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tune/BUILD b/python/ray/tune/BUILD index d3de3e0b8..b6332d656 100644 --- a/python/ray/tune/BUILD +++ b/python/ray/tune/BUILD @@ -67,7 +67,7 @@ py_test( py_test( name = "test_convergence", - size = "large", + size = "medium", srcs = ["tests/test_convergence.py"], deps = [":tune_lib"], tags = ["exclusive"], diff --git a/python/ray/workers/setup_runtime_env.py b/python/ray/workers/setup_runtime_env.py index ae25bd87a..822c0075c 100644 --- a/python/ray/workers/setup_runtime_env.py +++ b/python/ray/workers/setup_runtime_env.py @@ -25,16 +25,6 @@ from ray.workers.pluggable_runtime_env import (RuntimeEnvContext, logger = logging.getLogger(__name__) parser = argparse.ArgumentParser() -parser.add_argument( - "--worker-entrypoint", - type=str, - help="the worker entrypoint: python,java etc. ") - -parser.add_argument( - "--worker-language", - type=str, - help="the worker entrypoint: python,java,cpp etc.") - parser.add_argument( "--serialized-runtime-env", type=str, @@ -149,8 +139,7 @@ def setup_worker(input_args): args, remaining_args = parser.parse_known_args(args=input_args) commands = [] - worker_executable: str = args.worker_entrypoint - worker_language: str = args.worker_language + py_executable: str = sys.executable runtime_env: dict = json.loads(args.serialized_runtime_env or "{}") runtime_env_context: RuntimeEnvContext = None @@ -165,7 +154,7 @@ def setup_worker(input_args): # activate conda if runtime_env_context and runtime_env_context.conda_env_name: - worker_executable = "python" + py_executable = "python" conda_activate_commands = get_conda_activate_commands( runtime_env_context.conda_env_name) if (conda_activate_commands): @@ -177,29 +166,14 @@ def setup_worker(input_args): "the context %s.", args.serialized_runtime_env, args.serialized_runtime_env_context) - worker_command = [f"exec {worker_executable}"] - if worker_language == "java": - # Java worker don't parse the command parameters, add option. - remaining_args.insert( - len(remaining_args) - 1, "-D{}={}".format( - "serialized-runtime-env", f"'{args.serialized_runtime_env}'")) - worker_command += remaining_args - elif worker_language == "cpp": - worker_command += remaining_args - # cpp worker flags must use underscore - worker_command += [ - "--serialized_runtime_env", f"'{args.serialized_runtime_env}'" - ] - else: - worker_command += remaining_args - # Pass the runtime for working_dir setup. - # We can't do it in shim process here because it requires - # connection to gcs. - worker_command += [ - "--serialized-runtime-env", f"'{args.serialized_runtime_env}'" - ] - - commands += [" ".join(worker_command)] + commands += [ + " ".join( + [f"exec {py_executable}"] + remaining_args + + # Pass the runtime for working_dir setup. + # We can't do it in shim process here because it requires + # connection to gcs. + ["--serialized-runtime-env", f"'{args.serialized_runtime_env}'"]) + ] command_separator = " && " command_str = command_separator.join(commands) @@ -207,7 +181,6 @@ def setup_worker(input_args): if runtime_env.get("env_vars"): env_vars = runtime_env["env_vars"] os.environ.update(env_vars) - os.execvp("bash", ["bash", "-c", command_str]) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 1e3fa931a..3c075c1e1 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -327,6 +327,15 @@ Process WorkerPool::StartWorkerProcess( // Allocated_resource_json is only used in "shim process". worker_command_args.push_back("--allocated-instances-serialized-json=" + allocated_instances_serialized_json); + } else { + // The "shim process" setup worker is not needed, so do not run it. + // Check that the arg really is the path to the setup worker before erasing it, to + // prevent breaking tests that mock out the worker command args. + if (worker_command_args.size() >= 4 && + worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) { + worker_command_args.erase(worker_command_args.begin() + 1, + worker_command_args.begin() + 4); + } } worker_command_args.push_back("--runtime-env-hash=" +