Remove last traces of ref-counting flag (#17932)

This commit is contained in:
Eric Liang 2021-08-19 21:08:13 -07:00 committed by GitHub
parent 36c26578a7
commit 661ac4e37b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 7 additions and 43 deletions

View file

@ -193,7 +193,6 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
options.node_manager_port = ConfigInternal::Instance().node_manager_port;
options.raylet_ip_address = node_ip;
options.driver_name = "cpp_worker";
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
options.task_execution_callback = callback;

View file

@ -294,8 +294,6 @@ java_binary(
"//java:io_ray_ray_api",
"//java:io_ray_ray_runtime",
"//java:io_ray_ray_serve",
"//streaming/java:io_ray_ray_streaming-api",
"//streaming/java:io_ray_ray_streaming-runtime",
],
)
@ -304,7 +302,6 @@ genrule(
srcs = [
"//java:ray_dist_deploy.jar",
"//java:gen_maven_deps",
"//streaming/java:gen_maven_deps",
],
outs = ["ray_java_pkg.out"],
cmd = """

View file

@ -7,9 +7,8 @@ set -e
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)"
WORKSPACE_DIR="${ROOT_DIR}/.."
JAVA_DIRS_PATH=('java' 'streaming/java')
JAVA_DIRS_PATH=('java')
RAY_JAVA_MODULES=('api' 'runtime')
RAY_STREAMING_JAVA_MODULES=('streaming-api' 'streaming-runtime' 'streaming-state')
JAR_BASE_DIR="$WORKSPACE_DIR"/.jar
mkdir -p "$JAR_BASE_DIR"
cd "$WORKSPACE_DIR/java"
@ -47,9 +46,8 @@ build_jars() {
echo "Finished building jars for $p"
done
copy_jars "$JAR_DIR"
# ray runtime jar and streaming runtime jar are in a dir specifed by maven-jar-plugin
# ray runtime jar in a dir specifed by maven-jar-plugin
cp -f "$WORKSPACE_DIR"/build/java/ray*.jar "$JAR_DIR"
cp -f "$WORKSPACE_DIR"/streaming/build/java/streaming*.jar "$JAR_DIR"
echo "Finished building jar for $platform"
}
@ -59,12 +57,8 @@ copy_jars() {
for module in "${RAY_JAVA_MODULES[@]}"; do
cp -f "$WORKSPACE_DIR"/java/"$module"/target/*jar "$JAR_DIR"
done
for module in "${RAY_STREAMING_JAVA_MODULES[@]}"; do
cp -f "$WORKSPACE_DIR"/streaming/java/"$module"/target/*jar "$JAR_DIR"
done
# ray runtime jar and streaming runtime jar are in a dir specifed by maven-jar-plugin
# ray runtime jar and are in a dir specifed by maven-jar-plugin
cp -f "$WORKSPACE_DIR"/build/java/ray*.jar "$JAR_DIR"
cp -f "$WORKSPACE_DIR"/streaming/build/java/streaming*.jar "$JAR_DIR"
}
# This function assuem all dependencies are installed already.
@ -85,7 +79,7 @@ build_jars_multiplatform() {
return
fi
fi
if download_jars "ray-runtime-$version.jar" "streaming-runtime-$version.jar"; then
if download_jars "ray-runtime-$version.jar"; then
prepare_native
build_jars multiplatform false
else
@ -137,11 +131,6 @@ prepare_native() {
mkdir -p "$native_dir"
rm -rf "$native_dir"
mv "native/$os" "$native_dir"
jar xf "streaming-runtime-$version.jar" "native/$os"
local native_dir="$WORKSPACE_DIR/streaming/java/streaming-runtime/native_dependencies/native/$os"
mkdir -p "$native_dir"
rm -rf "$native_dir"
mv "native/$os" "$native_dir"
done
}
@ -151,7 +140,6 @@ native_files_exist() {
for os in 'darwin' 'linux'; do
native_dirs=()
native_dirs+=("$WORKSPACE_DIR/java/runtime/native_dependencies/native/$os")
native_dirs+=("$WORKSPACE_DIR/streaming/java/streaming-runtime/native_dependencies/native/$os")
for native_dir in "${native_dirs[@]}"; do
if [ ! -d "$native_dir" ]; then
echo "$native_dir doesn't exist"
@ -179,10 +167,6 @@ deploy_jars() {
cd "$WORKSPACE_DIR/java"
mvn -T16 install deploy -Dmaven.test.skip=true -Dcheckstyle.skip -Prelease -Dgpg.skip="${GPG_SKIP:-true}"
)
(
cd "$WORKSPACE_DIR/streaming/java"
mvn -T16 deploy -Dmaven.test.skip=true -Dcheckstyle.skip -Prelease -Dgpg.skip="${GPG_SKIP:-true}"
)
echo "Finished deploying jars"
else
echo "Native bianries/libraries are not ready, skip deploying jars."

View file

@ -959,7 +959,6 @@ cdef class CoreWorker:
options.run_on_util_worker_handler = run_on_util_worker_handler
options.unhandled_exception_handler = unhandled_exception_handler
options.get_lang_stack = get_py_stack
options.ref_counting_enabled = True
options.is_local_mode = local_mode
options.num_workers = 1
options.kill_main = kill_main_task

View file

@ -295,7 +295,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[c_string]&) nogil) run_on_util_worker_handler
(void(const CRayObject&) nogil) unhandled_exception_handler
(void(c_string *stack_out) nogil) get_lang_stack
c_bool ref_counting_enabled
c_bool is_local_mode
int num_workers
(c_bool() nogil) kill_main

View file

@ -71,9 +71,6 @@ RAY_CONFIG(bool, record_ref_creation_sites, true)
/// cluster. If set, then this is the duration between attempts to flush the
/// local cache. If this is set to 0, then the objects will be freed as soon as
/// they enter the cache. To disable eager eviction, set this to -1.
/// NOTE(swang): If distributed_ref_counting_enabled is off, then this will
/// likely cause spurious object lost errors for Object IDs that were
/// serialized, then either passed as an argument or returned from a task.
/// NOTE(swang): The timer is checked by the raylet during every heartbeat, so
/// this should be set to a value larger than
/// raylet_heartbeat_period_milliseconds.

View file

@ -538,8 +538,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
PutObjectIntoPlasma(object, object_id);
return Status::OK();
},
options_.ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_,
options_.check_signals,
reference_counter_, local_raylet_client_, options_.check_signals,
[this](const RayObject &obj) {
// Run this on the event loop to avoid calling back into the language runtime
// from the middle of user operations.
@ -2211,9 +2210,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
RAY_LOG(DEBUG) << "Decrementing ref for borrowed ID " << borrowed_id;
reference_counter_->RemoveLocalReference(borrowed_id, &deleted);
}
if (options_.ref_counting_enabled) {
memory_store_->Delete(deleted);
}
memory_store_->Delete(deleted);
if (task_spec.IsNormalTask() && reference_counter_->NumObjectIDsInScope() != 0) {
RAY_LOG(DEBUG)

View file

@ -94,7 +94,6 @@ struct CoreWorkerOptions {
unhandled_exception_handler(nullptr),
get_lang_stack(nullptr),
kill_main(nullptr),
ref_counting_enabled(false),
is_local_mode(false),
num_workers(0),
terminate_asyncio_thread(nullptr),
@ -169,8 +168,6 @@ struct CoreWorkerOptions {
std::function<void(std::string *)> get_lang_stack;
// Function that tries to interrupt the currently running Python thread.
std::function<bool()> kill_main;
/// Whether to enable object ref counting.
bool ref_counting_enabled;
/// Is local mode being used.
bool is_local_mode;
/// The number of workers to be started in the current process.
@ -427,7 +424,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::vector<ObjectID> deleted;
reference_counter_->RemoveLocalReference(object_id, &deleted);
// TOOD(ilr): better way of keeping an object from being deleted
if (options_.ref_counting_enabled && !options_.is_local_mode) {
if (!options_.is_local_mode) {
memory_store_->Delete(deleted);
}
}

View file

@ -244,7 +244,6 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeInitialize(
options.task_execution_callback = task_execution_callback;
options.on_worker_shutdown = on_worker_shutdown;
options.gc_collect = gc_collect;
options.ref_counting_enabled = true;
options.num_workers = static_cast<int>(numWorkersPerProcess);
options.serialized_job_config = serialized_job_config;
options.metrics_agent_port = -1;

View file

@ -145,7 +145,6 @@ class CoreWorkerTest : public ::testing::Test {
options.node_manager_port = node_manager_port;
options.raylet_ip_address = "127.0.0.1";
options.driver_name = "core_worker_test";
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
CoreWorkerProcess::Initialize(options);

View file

@ -48,7 +48,6 @@ class MockWorker {
options.raylet_ip_address = "127.0.0.1";
options.task_execution_callback =
std::bind(&MockWorker::ExecuteTask, this, _1, _2, _3, _4, _5, _6, _7, _8, _9);
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
CoreWorkerProcess::Initialize(options);

View file

@ -501,7 +501,6 @@ class StreamingWorker {
options.raylet_ip_address = "127.0.0.1";
options.task_execution_callback = std::bind(&StreamingWorker::ExecuteTask, this, _1,
_2, _3, _4, _5, _6, _7, _8, _9);
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
CoreWorkerProcess::Initialize(options);

View file

@ -232,7 +232,6 @@ class StreamingQueueTestBase : public ::testing::TestWithParam<uint64_t> {
options.node_manager_port = node_manager_port_;
options.raylet_ip_address = "127.0.0.1";
options.driver_name = "queue_tests";
options.ref_counting_enabled = true;
options.num_workers = 1;
options.metrics_agent_port = -1;
InitShutdownRAII core_worker_raii(CoreWorkerProcess::Initialize,