From 661ac4e37bb5b920c8ed27b0f1a201d9224d7f8b Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 19 Aug 2021 21:08:13 -0700 Subject: [PATCH] Remove last traces of ref-counting flag (#17932) --- cpp/src/ray/util/process_helper.cc | 1 - java/BUILD.bazel | 3 --- java/build-jar-multiplatform.sh | 24 ++++--------------- python/ray/_raylet.pyx | 1 - python/ray/includes/libcoreworker.pxd | 1 - src/ray/common/ray_config_def.h | 3 --- src/ray/core_worker/core_worker.cc | 7 ++---- src/ray/core_worker/core_worker.h | 5 +--- .../java/io_ray_runtime_RayNativeRuntime.cc | 1 - src/ray/core_worker/test/core_worker_test.cc | 1 - src/ray/core_worker/test/mock_worker.cc | 1 - streaming/src/test/mock_actor.cc | 1 - streaming/src/test/queue_tests_base.h | 1 - 13 files changed, 7 insertions(+), 43 deletions(-) diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index fe456b1ae..c2920a46f 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -193,7 +193,6 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) options.node_manager_port = ConfigInternal::Instance().node_manager_port; options.raylet_ip_address = node_ip; options.driver_name = "cpp_worker"; - options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; options.task_execution_callback = callback; diff --git a/java/BUILD.bazel b/java/BUILD.bazel index 2d98abb94..d01a93254 100644 --- a/java/BUILD.bazel +++ b/java/BUILD.bazel @@ -294,8 +294,6 @@ java_binary( "//java:io_ray_ray_api", "//java:io_ray_ray_runtime", "//java:io_ray_ray_serve", - "//streaming/java:io_ray_ray_streaming-api", - "//streaming/java:io_ray_ray_streaming-runtime", ], ) @@ -304,7 +302,6 @@ genrule( srcs = [ "//java:ray_dist_deploy.jar", "//java:gen_maven_deps", - "//streaming/java:gen_maven_deps", ], outs = ["ray_java_pkg.out"], cmd = """ diff --git a/java/build-jar-multiplatform.sh b/java/build-jar-multiplatform.sh index 0a1793eca..08490fae8 100755 --- a/java/build-jar-multiplatform.sh +++ b/java/build-jar-multiplatform.sh @@ -7,9 +7,8 @@ set -e ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)" WORKSPACE_DIR="${ROOT_DIR}/.." -JAVA_DIRS_PATH=('java' 'streaming/java') +JAVA_DIRS_PATH=('java') RAY_JAVA_MODULES=('api' 'runtime') -RAY_STREAMING_JAVA_MODULES=('streaming-api' 'streaming-runtime' 'streaming-state') JAR_BASE_DIR="$WORKSPACE_DIR"/.jar mkdir -p "$JAR_BASE_DIR" cd "$WORKSPACE_DIR/java" @@ -47,9 +46,8 @@ build_jars() { echo "Finished building jars for $p" done copy_jars "$JAR_DIR" - # ray runtime jar and streaming runtime jar are in a dir specifed by maven-jar-plugin + # ray runtime jar in a dir specifed by maven-jar-plugin cp -f "$WORKSPACE_DIR"/build/java/ray*.jar "$JAR_DIR" - cp -f "$WORKSPACE_DIR"/streaming/build/java/streaming*.jar "$JAR_DIR" echo "Finished building jar for $platform" } @@ -59,12 +57,8 @@ copy_jars() { for module in "${RAY_JAVA_MODULES[@]}"; do cp -f "$WORKSPACE_DIR"/java/"$module"/target/*jar "$JAR_DIR" done - for module in "${RAY_STREAMING_JAVA_MODULES[@]}"; do - cp -f "$WORKSPACE_DIR"/streaming/java/"$module"/target/*jar "$JAR_DIR" - done - # ray runtime jar and streaming runtime jar are in a dir specifed by maven-jar-plugin + # ray runtime jar and are in a dir specifed by maven-jar-plugin cp -f "$WORKSPACE_DIR"/build/java/ray*.jar "$JAR_DIR" - cp -f "$WORKSPACE_DIR"/streaming/build/java/streaming*.jar "$JAR_DIR" } # This function assuem all dependencies are installed already. @@ -85,7 +79,7 @@ build_jars_multiplatform() { return fi fi - if download_jars "ray-runtime-$version.jar" "streaming-runtime-$version.jar"; then + if download_jars "ray-runtime-$version.jar"; then prepare_native build_jars multiplatform false else @@ -137,11 +131,6 @@ prepare_native() { mkdir -p "$native_dir" rm -rf "$native_dir" mv "native/$os" "$native_dir" - jar xf "streaming-runtime-$version.jar" "native/$os" - local native_dir="$WORKSPACE_DIR/streaming/java/streaming-runtime/native_dependencies/native/$os" - mkdir -p "$native_dir" - rm -rf "$native_dir" - mv "native/$os" "$native_dir" done } @@ -151,7 +140,6 @@ native_files_exist() { for os in 'darwin' 'linux'; do native_dirs=() native_dirs+=("$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os") - native_dirs+=("$WORKSPACE_DIR/streaming/java/streaming-runtime/native_dependencies/native/$os") for native_dir in "${native_dirs[@]}"; do if [ ! -d "$native_dir" ]; then echo "$native_dir doesn't exist" @@ -179,10 +167,6 @@ deploy_jars() { cd "$WORKSPACE_DIR/java" mvn -T16 install deploy -Dmaven.test.skip=true -Dcheckstyle.skip -Prelease -Dgpg.skip="${GPG_SKIP:-true}" ) - ( - cd "$WORKSPACE_DIR/streaming/java" - mvn -T16 deploy -Dmaven.test.skip=true -Dcheckstyle.skip -Prelease -Dgpg.skip="${GPG_SKIP:-true}" - ) echo "Finished deploying jars" else echo "Native bianries/libraries are not ready, skip deploying jars." diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2b990a469..63ac8078f 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -959,7 +959,6 @@ cdef class CoreWorker: options.run_on_util_worker_handler = run_on_util_worker_handler options.unhandled_exception_handler = unhandled_exception_handler options.get_lang_stack = get_py_stack - options.ref_counting_enabled = True options.is_local_mode = local_mode options.num_workers = 1 options.kill_main = kill_main_task diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index cfc487582..a5e3191c7 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -295,7 +295,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const c_vector[c_string]&) nogil) run_on_util_worker_handler (void(const CRayObject&) nogil) unhandled_exception_handler (void(c_string *stack_out) nogil) get_lang_stack - c_bool ref_counting_enabled c_bool is_local_mode int num_workers (c_bool() nogil) kill_main diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index c604c0d46..eb9d5dc15 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -71,9 +71,6 @@ RAY_CONFIG(bool, record_ref_creation_sites, true) /// cluster. If set, then this is the duration between attempts to flush the /// local cache. If this is set to 0, then the objects will be freed as soon as /// they enter the cache. To disable eager eviction, set this to -1. -/// NOTE(swang): If distributed_ref_counting_enabled is off, then this will -/// likely cause spurious object lost errors for Object IDs that were -/// serialized, then either passed as an argument or returned from a task. /// NOTE(swang): The timer is checked by the raylet during every heartbeat, so /// this should be set to a value larger than /// raylet_heartbeat_period_milliseconds. diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2a4f5bbfc..0841c1e28 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -538,8 +538,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ PutObjectIntoPlasma(object, object_id); return Status::OK(); }, - options_.ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_, - options_.check_signals, + reference_counter_, local_raylet_client_, options_.check_signals, [this](const RayObject &obj) { // Run this on the event loop to avoid calling back into the language runtime // from the middle of user operations. @@ -2211,9 +2210,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, RAY_LOG(DEBUG) << "Decrementing ref for borrowed ID " << borrowed_id; reference_counter_->RemoveLocalReference(borrowed_id, &deleted); } - if (options_.ref_counting_enabled) { - memory_store_->Delete(deleted); - } + memory_store_->Delete(deleted); if (task_spec.IsNormalTask() && reference_counter_->NumObjectIDsInScope() != 0) { RAY_LOG(DEBUG) diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index cc795a3d9..c4ca1c396 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -94,7 +94,6 @@ struct CoreWorkerOptions { unhandled_exception_handler(nullptr), get_lang_stack(nullptr), kill_main(nullptr), - ref_counting_enabled(false), is_local_mode(false), num_workers(0), terminate_asyncio_thread(nullptr), @@ -169,8 +168,6 @@ struct CoreWorkerOptions { std::function get_lang_stack; // Function that tries to interrupt the currently running Python thread. std::function kill_main; - /// Whether to enable object ref counting. - bool ref_counting_enabled; /// Is local mode being used. bool is_local_mode; /// The number of workers to be started in the current process. @@ -427,7 +424,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::vector deleted; reference_counter_->RemoveLocalReference(object_id, &deleted); // TOOD(ilr): better way of keeping an object from being deleted - if (options_.ref_counting_enabled && !options_.is_local_mode) { + if (!options_.is_local_mode) { memory_store_->Delete(deleted); } } diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index d6dec2806..5842ec531 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -244,7 +244,6 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize( options.task_execution_callback = task_execution_callback; options.on_worker_shutdown = on_worker_shutdown; options.gc_collect = gc_collect; - options.ref_counting_enabled = true; options.num_workers = static_cast(numWorkersPerProcess); options.serialized_job_config = serialized_job_config; options.metrics_agent_port = -1; diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index b62cb7dd9..e0b412bdf 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -145,7 +145,6 @@ class CoreWorkerTest : public ::testing::Test { options.node_manager_port = node_manager_port; options.raylet_ip_address = "127.0.0.1"; options.driver_name = "core_worker_test"; - options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; CoreWorkerProcess::Initialize(options); diff --git a/src/ray/core_worker/test/mock_worker.cc b/src/ray/core_worker/test/mock_worker.cc index fcb00985d..aaa9fbba9 100644 --- a/src/ray/core_worker/test/mock_worker.cc +++ b/src/ray/core_worker/test/mock_worker.cc @@ -48,7 +48,6 @@ class MockWorker { options.raylet_ip_address = "127.0.0.1"; options.task_execution_callback = std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9); - options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; CoreWorkerProcess::Initialize(options); diff --git a/streaming/src/test/mock_actor.cc b/streaming/src/test/mock_actor.cc index 881fe8918..8ab5a5e90 100644 --- a/streaming/src/test/mock_actor.cc +++ b/streaming/src/test/mock_actor.cc @@ -501,7 +501,6 @@ class StreamingWorker { options.raylet_ip_address = "127.0.0.1"; options.task_execution_callback = std::bind(&StreamingWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9); - options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; CoreWorkerProcess::Initialize(options); diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index 3f33aa2be..f768cc711 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -232,7 +232,6 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { options.node_manager_port = node_manager_port_; options.raylet_ip_address = "127.0.0.1"; options.driver_name = "queue_tests"; - options.ref_counting_enabled = true; options.num_workers = 1; options.metrics_agent_port = -1; InitShutdownRAII core_worker_raii(CoreWorkerProcess::Initialize,