Convert the raylet client (the code in local_scheduler_client.cc) to proper C++. (#3511)

* refactoring

* fix bugs

* create client class

* create client class for java; bug fix

* remove legacy code

* improve code by using std::string, std::unique_ptr rename private fields and removing legacy code

* rename class

* improve naming

* fix

* rename files

* fix names

* change name

* change return types

* make a mutex private field

* fix comments

* fix bugs

* lint

* bug fix

* bug fix

* move too short functions into the header file

* Loose crash conditions for some APIs.

* Apply suggestions from code review

Co-Authored-By: suquark <suquark@gmail.com>

* format

* update

* rename python APIs

* fix java

* more fixes

* change types of cpython interface

* more fixes

* improve error processing

* improve error processing for java wrapper

* lint

* fix java

* make fields const

* use pointers for [out] parameters

* fix java & error msg

* fix resource leak, etc.
This commit is contained in:
Si-Yuan 2018-12-13 13:39:10 -08:00 committed by Philipp Moritz
parent 5dcc333199
commit 84fae57ab5
22 changed files with 739 additions and 828 deletions

View file

@ -101,7 +101,7 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES")
set(ray_file_list
"src/ray/thirdparty/redis/src/redis-server"
"src/ray/gcs/redis_module/libray_redis_module.so"
"src/ray/raylet/liblocal_scheduler_library_python.so"
"src/ray/raylet/libraylet_library_python.so"
"src/ray/raylet/raylet_monitor"
"src/ray/raylet/raylet")
@ -128,8 +128,8 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES")
add_dependencies(copy_ray copy_ray_files)
# Make sure that the Python extensions are built before copying the files.
get_local_scheduler_library("python" LOCAL_SCHEDULER_LIBRARY_PYTHON)
add_dependencies(copy_ray ${LOCAL_SCHEDULER_LIBRARY_PYTHON})
get_raylet_library("python" RAYLET_LIBRARY_PYTHON)
add_dependencies(copy_ray ${RAYLET_LIBRARY_PYTHON})
foreach(file ${ray_file_list})
add_custom_command(TARGET copy_ray POST_BUILD
@ -146,8 +146,8 @@ if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES")
endif()
if ("${CMAKE_RAY_LANG_JAVA}" STREQUAL "YES")
get_local_scheduler_library("java" LOCAL_SCHEDULER_LIBRARY_JAVA)
add_dependencies(copy_ray ${LOCAL_SCHEDULER_LIBRARY_JAVA})
get_raylet_library("java" RAYLET_LIBRARY_JAVA)
add_dependencies(copy_ray ${RAYLET_LIBRARY_JAVA})
# copy libplasma_java files
add_custom_command(TARGET copy_ray POST_BUILD

View file

@ -50,7 +50,7 @@ declare -a nativeBinaries=(
declare -a nativeLibraries=(
"./src/ray/gcs/redis_module/libray_redis_module.so"
"./src/ray/raylet/liblocal_scheduler_library_java.*"
"./src/ray/raylet/libraylet_library_java.*"
"./src/plasma/libplasma_java.*"
"./src/ray/raylet/*lib.a"
)

View file

@ -61,7 +61,7 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
// Load native libraries.
try {
resetLibaryPath();
System.loadLibrary("local_scheduler_library_java");
System.loadLibrary("raylet_library_java");
System.loadLibrary("plasma_java");
} catch (Exception e) {
LOGGER.error("Failed to load native libraries.", e);

View file

@ -791,7 +791,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
# Disconnect the worker from the local scheduler. The point of
# this is so that when the worker kills itself below, the local
# scheduler won't push an error message to the driver.
worker.local_scheduler_client.disconnect()
worker.raylet_client.disconnect()
sys.exit(0)
assert False, "This process should have terminated."
@ -832,8 +832,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
# the local scheduler will not be included, and may not be runnable
# on checkpoint resumption.
actor_id = ray.ObjectID(worker.actor_id)
frontier = worker.local_scheduler_client.get_actor_frontier(
actor_id)
frontier = worker.raylet_client.get_actor_frontier(actor_id)
# Save the checkpoint in Redis. TODO(rkn): Checkpoints
# should not be stored in Redis. Fix this.
set_actor_checkpoint(worker, worker.actor_id, checkpoint_index,
@ -863,7 +862,7 @@ def make_actor(cls, num_cpus, num_gpus, resources, actor_method_cpus,
# Set the number of tasks executed so far.
worker.actor_task_counter = checkpoint_index
# Set the actor frontier in the local scheduler.
worker.local_scheduler_client.set_actor_frontier(frontier)
worker.raylet_client.set_actor_frontier(frontier)
checkpoint_resumed = True
return checkpoint_resumed

View file

@ -36,10 +36,10 @@ def warmup():
def fetch(oids):
local_sched_client = ray.worker.global_worker.local_scheduler_client
raylet_client = ray.worker.global_worker.raylet_client
for o in oids:
ray_obj_id = ray.ObjectID(o)
local_sched_client.fetch_or_reconstruct([ray_obj_id], True)
raylet_client.fetch_or_reconstruct([ray_obj_id], True)
def run_timeline(sess, ops, feed_dict=None, write_timeline=False, name=""):

View file

@ -42,4 +42,4 @@ def free(object_ids, local_only=False, worker=None):
if len(object_ids) == 0:
return
worker.local_scheduler_client.free(object_ids, local_only)
worker.raylet_client.free_objects(object_ids, local_only)

View file

@ -119,7 +119,7 @@ class Profiler(object):
else:
component_type = "driver"
self.worker.local_scheduler_client.push_profile_events(
self.worker.raylet_client.push_profile_events(
component_type, ray.ObjectID(self.worker.worker_id),
self.worker.node_ip_address, events)

View file

@ -2,12 +2,12 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from ray.core.src.ray.raylet.liblocal_scheduler_library_python import (
Task, LocalSchedulerClient, ObjectID, check_simple_value, compute_task_id,
from ray.core.src.ray.raylet.libraylet_library_python import (
Task, RayletClient, ObjectID, check_simple_value, compute_task_id,
task_from_string, task_to_string, _config, common_error)
__all__ = [
"Task", "LocalSchedulerClient", "ObjectID", "check_simple_value",
"Task", "RayletClient", "ObjectID", "check_simple_value",
"compute_task_id", "task_from_string", "task_to_string",
"start_local_scheduler", "_config", "common_error"
]

View file

@ -39,8 +39,8 @@ class TaskPool(object):
for worker, obj_id in self.completed():
plasma_id = ray.pyarrow.plasma.ObjectID(obj_id.id())
(ray.worker.global_worker.local_scheduler_client.
fetch_or_reconstruct([obj_id], True))
(ray.worker.global_worker.raylet_client.fetch_or_reconstruct(
[obj_id], True))
self._fetching.append((worker, obj_id))
remaining = []

View file

@ -69,7 +69,7 @@ def push_error_to_driver(worker,
if driver_id is None:
driver_id = ray_constants.NIL_JOB_ID.id()
data = {} if data is None else data
worker.local_scheduler_client.push_error(
worker.raylet_client.push_error(
ray.ObjectID(driver_id), error_type, message, time.time())

View file

@ -455,7 +455,7 @@ class Worker(object):
]
for i in range(0, len(object_ids),
ray._config.worker_fetch_request_size()):
self.local_scheduler_client.fetch_or_reconstruct(
self.raylet_client.fetch_or_reconstruct(
object_ids[i:(i + ray._config.worker_fetch_request_size())],
True)
@ -490,7 +490,7 @@ class Worker(object):
ray._config.worker_fetch_request_size())
for i in range(0, len(object_ids_to_fetch),
fetch_request_size):
self.local_scheduler_client.fetch_or_reconstruct(
self.raylet_client.fetch_or_reconstruct(
ray_object_ids_to_fetch[i:(
i + fetch_request_size)], False,
current_task_id)
@ -511,7 +511,7 @@ class Worker(object):
# If there were objects that we weren't able to get locally,
# let the local scheduler know that we're now unblocked.
self.local_scheduler_client.notify_unblocked(current_task_id)
self.raylet_client.notify_unblocked(current_task_id)
assert len(final_results) == len(object_ids)
return final_results
@ -628,7 +628,7 @@ class Worker(object):
actor_creation_id, actor_creation_dummy_object_id, actor_id,
actor_handle_id, actor_counter, execution_dependencies,
resources, placement_resources)
self.local_scheduler_client.submit(task)
self.raylet_client.submit_task(task)
return task.returns()
@ -936,7 +936,7 @@ class Worker(object):
reached_max_executions = (self.function_actor_manager.get_task_counter(
driver_id, function_id.id()) == execution_info.max_calls)
if reached_max_executions:
self.local_scheduler_client.disconnect()
self.raylet_client.disconnect()
sys.exit(0)
def _get_next_task_from_local_scheduler(self):
@ -946,7 +946,7 @@ class Worker(object):
A task from the local scheduler.
"""
with profiling.profile("worker_idle", worker=self):
task = self.local_scheduler_client.get_task()
task = self.raylet_client.get_task()
# Automatically restrict the GPUs available to this task.
ray.utils.set_cuda_visible_devices(ray.get_gpu_ids())
@ -982,7 +982,7 @@ def get_gpu_ids():
raise Exception("ray.get_gpu_ids() currently does not work in PYTHON "
"MODE.")
all_resource_ids = global_worker.local_scheduler_client.resource_ids()
all_resource_ids = global_worker.raylet_client.resource_ids()
assigned_ids = [
resource_id for resource_id, _ in all_resource_ids.get("GPU", [])
]
@ -1010,7 +1010,7 @@ def get_resource_ids():
"ray.get_resource_ids() currently does not work in PYTHON "
"MODE.")
return global_worker.local_scheduler_client.resource_ids()
return global_worker.raylet_client.resource_ids()
def _webui_url_helper(client):
@ -1733,8 +1733,8 @@ def shutdown(worker=global_worker):
will need to reload the module.
"""
disconnect(worker)
if hasattr(worker, "local_scheduler_client"):
del worker.local_scheduler_client
if hasattr(worker, "raylet_client"):
del worker.raylet_client
if hasattr(worker, "plasma_client"):
worker.plasma_client.disconnect()
@ -2120,7 +2120,7 @@ def connect(info,
# multithreading per worker.
worker.multithreading_warned = False
worker.local_scheduler_client = ray.raylet.LocalSchedulerClient(
worker.raylet_client = ray.raylet.RayletClient(
raylet_socket, worker.worker_id, is_worker, worker.current_task_id)
# Start the import thread
@ -2406,7 +2406,7 @@ def put(value, worker=global_worker):
if worker.mode == LOCAL_MODE:
# In LOCAL_MODE, ray.put is the identity operation.
return value
object_id = worker.local_scheduler_client.compute_put_id(
object_id = worker.raylet_client.compute_put_id(
worker.current_task_id, worker.put_index)
worker.put_object(object_id, value)
worker.put_index += 1
@ -2486,7 +2486,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker):
current_task_id = worker.get_current_thread_task_id()
timeout = timeout if timeout is not None else 2**30
ready_ids, remaining_ids = worker.local_scheduler_client.wait(
ready_ids, remaining_ids = worker.raylet_client.wait(
object_ids, num_returns, timeout, False, current_task_id)
return ready_ids, remaining_ids

View file

@ -23,7 +23,7 @@ ray_files = [
"ray/core/src/ray/thirdparty/redis/src/redis-server",
"ray/core/src/ray/gcs/redis_module/libray_redis_module.so",
"ray/core/src/plasma/plasma_store_server",
"ray/core/src/ray/raylet/liblocal_scheduler_library_python.so",
"ray/core/src/ray/raylet/libraylet_library_python.so",
"ray/core/src/ray/raylet/raylet_monitor", "ray/core/src/ray/raylet/raylet",
"ray/WebUI.ipynb"
]

View file

@ -44,12 +44,12 @@ include_directories(${GCS_FBS_OUTPUT_DIRECTORY})
add_library(rayletlib raylet.cc ${NODE_MANAGER_FBS_OUTPUT_FILES})
target_link_libraries(rayletlib ray_static ${Boost_SYSTEM_LIBRARY})
add_library(local_scheduler_client STATIC local_scheduler_client.cc)
add_library(raylet_client STATIC raylet_client.cc)
# Encode the fact that some things require some autogenerated flatbuffer files
# to be created first.
add_dependencies(rayletlib gen_gcs_fbs)
add_dependencies(local_scheduler_client gen_gcs_fbs arrow_ep gen_node_manager_fbs)
add_dependencies(raylet_client gen_gcs_fbs arrow_ep gen_node_manager_fbs)
add_executable(raylet main.cc)
target_link_libraries(raylet rayletlib ${Boost_SYSTEM_LIBRARY} pthread)
@ -61,35 +61,35 @@ install(FILES
raylet
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/ray/raylet")
macro(get_local_scheduler_library LANG VAR)
set(${VAR} "local_scheduler_library_${LANG}")
macro(get_raylet_library LANG VAR)
set(${VAR} "raylet_library_${LANG}")
endmacro()
macro(set_local_scheduler_library LANG)
get_local_scheduler_library(${LANG} LOCAL_SCHEDULER_LIBRARY_${LANG})
set(LOCAL_SCHEDULER_LIBRARY_LANG ${LOCAL_SCHEDULER_LIBRARY_${LANG}})
macro(set_raylet_library LANG)
get_raylet_library(${LANG} RAYLET_LIBRARY_${LANG})
set(RAYLET_LIBRARY_LANG ${RAYLET_LIBRARY_${LANG}})
file(GLOB LOCAL_SCHEDULER_LIBRARY_${LANG}_SRC
file(GLOB RAYLET_LIBRARY_${LANG}_SRC
lib/${LANG}/*.cc)
add_library(${LOCAL_SCHEDULER_LIBRARY_LANG} SHARED
${LOCAL_SCHEDULER_LIBRARY_${LANG}_SRC})
add_library(${RAYLET_LIBRARY_LANG} SHARED
${RAYLET_LIBRARY_${LANG}_SRC})
if(APPLE)
if ("${LANG}" STREQUAL "python")
SET_TARGET_PROPERTIES(${LOCAL_SCHEDULER_LIBRARY_LANG} PROPERTIES SUFFIX .so)
SET_TARGET_PROPERTIES(${RAYLET_LIBRARY_LANG} PROPERTIES SUFFIX .so)
endif()
target_link_libraries(${LOCAL_SCHEDULER_LIBRARY_LANG} "-undefined dynamic_lookup" local_scheduler_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
target_link_libraries(${RAYLET_LIBRARY_LANG} "-undefined dynamic_lookup" raylet_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
else(APPLE)
target_link_libraries(${LOCAL_SCHEDULER_LIBRARY_LANG} local_scheduler_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
target_link_libraries(${RAYLET_LIBRARY_LANG} raylet_client ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} ${Boost_SYSTEM_LIBRARY})
endif(APPLE)
add_dependencies(${LOCAL_SCHEDULER_LIBRARY_LANG} gen_node_manager_fbs)
add_dependencies(${RAYLET_LIBRARY_LANG} gen_node_manager_fbs)
install(TARGETS ${LOCAL_SCHEDULER_LIBRARY_LANG} DESTINATION ${CMAKE_SOURCE_DIR}/local_scheduler)
install(TARGETS ${RAYLET_LIBRARY_LANG} DESTINATION ${CMAKE_SOURCE_DIR}/raylet)
endmacro()
if ("${CMAKE_RAY_LANG_PYTHON}" STREQUAL "YES")
set_local_scheduler_library("python")
set_raylet_library("python")
include_directories("${PYTHON_INCLUDE_DIRS}")
include_directories("${NUMPY_INCLUDE_DIR}")
endif()
@ -103,5 +103,5 @@ if ("${CMAKE_RAY_LANG_JAVA}" STREQUAL "YES")
else() # linux
add_compile_options("-I$ENV{JAVA_HOME}/include/linux")
endif()
set_local_scheduler_library("java")
set_raylet_library("java")
endif()

View file

@ -118,7 +118,7 @@ table GetTaskReply {
}
// This struct is used to register a new worker with the local scheduler.
// It is shipped as part of local_scheduler_connect.
// It is shipped as part of raylet_connect.
table RegisterClientRequest {
// True if the client is a worker and false if the client is a driver.
is_worker: bool;

View file

@ -3,7 +3,7 @@
#include <jni.h>
#include "ray/id.h"
#include "ray/raylet/local_scheduler_client.h"
#include "ray/raylet/raylet_client.h"
#include "ray/util/logging.h"
#ifdef __cplusplus
@ -42,10 +42,10 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit(
UniqueIdFromJByteArray worker_id(env, workerId);
UniqueIdFromJByteArray driver_id(env, driverId);
const char *nativeString = env->GetStringUTFChars(sockName, JNI_FALSE);
auto client = LocalSchedulerConnection_init(nativeString, *worker_id.PID, isWorker,
*driver_id.PID, Language::JAVA);
auto raylet_client = new RayletClient(nativeString, *worker_id.PID, isWorker,
*driver_id.PID, Language::JAVA);
env->ReleaseStringUTFChars(sockName, nativeString);
return reinterpret_cast<jlong>(client);
return reinterpret_cast<jlong>(raylet_client);
}
/*
@ -56,7 +56,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeInit(
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmitTask(
JNIEnv *env, jclass, jlong client, jbyteArray cursorId, jobject taskBuff, jint pos,
jint taskSize) {
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
std::vector<ObjectID> execution_dependencies;
if (cursorId != nullptr) {
@ -66,7 +66,8 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit
auto data = reinterpret_cast<char *>(env->GetDirectBufferAddress(taskBuff)) + pos;
ray::raylet::TaskSpecification task_spec(std::string(data, taskSize));
local_scheduler_submit_raylet(conn, execution_dependencies, task_spec);
auto status = raylet_client->SubmitTask(execution_dependencies, task_spec);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to submit a task to raylet.");
}
/*
@ -76,10 +77,12 @@ JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeSubmit
*/
JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeGetTask(
JNIEnv *env, jclass, jlong client) {
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
// TODO: handle actor failure later
ray::raylet::TaskSpecification *spec = local_scheduler_get_task_raylet(conn);
std::unique_ptr<ray::raylet::TaskSpecification> spec;
auto status = raylet_client->GetTask(&spec);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to get a task from raylet.");
// We serialize the task specification using flatbuffers and then parse the
// resulting string. This awkwardness is due to the fact that the Java
@ -100,7 +103,6 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_native
result, 0, task_message->size(),
reinterpret_cast<jbyte *>(const_cast<char *>(task_message->data())));
delete spec;
return result;
}
@ -111,9 +113,10 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_native
*/
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeDestroy(
JNIEnv *, jclass, jlong client) {
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
local_scheduler_disconnect_client(conn);
LocalSchedulerConnection_free(conn);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
RAY_CHECK_OK_PREPEND(raylet_client->Disconnect(),
"[RayletClient] Failed to disconnect.");
delete raylet_client;
}
/*
@ -135,9 +138,10 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFetchOrReconstruct(
env->DeleteLocalRef(object_id_bytes);
}
UniqueIdFromJByteArray current_task_id(env, currentTaskId);
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
return local_scheduler_fetch_or_reconstruct(conn, object_ids, fetchOnly,
*current_task_id.PID);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
auto status =
raylet_client->FetchOrReconstruct(object_ids, fetchOnly, *current_task_id.PID);
return static_cast<jint>(status.code());
}
/*
@ -148,8 +152,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFetchOrReconstruct(
JNIEXPORT void JNICALL Java_org_ray_runtime_raylet_RayletClientImpl_nativeNotifyUnblocked(
JNIEnv *env, jclass, jlong client, jbyteArray currentTaskId) {
UniqueIdFromJByteArray current_task_id(env, currentTaskId);
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
local_scheduler_notify_unblocked(conn, *current_task_id.PID);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
auto status = raylet_client->NotifyUnblocked(*current_task_id.PID);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to notify unblocked.");
}
/*
@ -172,12 +177,14 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeWaitObject(
}
UniqueIdFromJByteArray current_task_id(env, currentTaskId);
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
// Invoke wait.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> result =
local_scheduler_wait(conn, object_ids, numReturns, timeoutMillis,
static_cast<bool>(isWaitLocal), *current_task_id.PID);
WaitResultPair result;
auto status =
raylet_client->Wait(object_ids, numReturns, timeoutMillis,
static_cast<bool>(isWaitLocal), *current_task_id.PID, &result);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to wait for objects.");
// Convert result to java object.
jboolean put_value = true;
@ -246,8 +253,9 @@ Java_org_ray_runtime_raylet_RayletClientImpl_nativeFreePlasmaObjects(
object_ids.push_back(*object_id.PID);
env->DeleteLocalRef(object_id_bytes);
}
auto conn = reinterpret_cast<LocalSchedulerConnection *>(client);
local_scheduler_free_objects_in_object_store(conn, object_ids, localOnly);
auto raylet_client = reinterpret_cast<RayletClient *>(client);
auto status = raylet_client->FreeObjects(object_ids, localOnly);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to free objects.");
}
#ifdef __cplusplus

View file

@ -117,7 +117,7 @@ TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) {
*
* This is called from Python like
*
* task = local_scheduler.task_from_string("...")
* task = raylet.task_from_string("...")
*
* @param task_string String representation of the task specification.
* @return Python task specification object.
@ -142,7 +142,7 @@ PyObject *PyTask_from_string(PyObject *self, PyObject *args) {
*
* This is called from Python like
*
* s = local_scheduler.task_to_string(task)
* s = raylet.task_to_string(task)
*
* @param task Ray task specification Python object.
* @return String representing the task specification.

View file

@ -3,78 +3,73 @@
#include "common_extension.h"
#include "config_extension.h"
#include "ray/raylet/local_scheduler_client.h"
#include "ray/raylet/raylet_client.h"
PyObject *LocalSchedulerError;
// clang-format off
typedef struct {
PyObject_HEAD
LocalSchedulerConnection *local_scheduler_connection;
} PyLocalSchedulerClient;
RayletClient *raylet_client;
} PyRayletClient;
// clang-format on
static int PyLocalSchedulerClient_init(PyLocalSchedulerClient *self, PyObject *args,
PyObject *kwds) {
static int PyRayletClient_init(PyRayletClient *self, PyObject *args, PyObject *kwds) {
char *socket_name;
UniqueID client_id;
PyObject *is_worker;
JobID driver_id;
if (!PyArg_ParseTuple(args, "sO&OO&", &socket_name, PyStringToUniqueID, &client_id,
&is_worker, &PyObjectToUniqueID, &driver_id)) {
self->local_scheduler_connection = NULL;
self->raylet_client = NULL;
return -1;
}
/* Connect to the local scheduler. */
self->local_scheduler_connection = LocalSchedulerConnection_init(
socket_name, client_id, static_cast<bool>(PyObject_IsTrue(is_worker)), driver_id,
Language::PYTHON);
self->raylet_client = new RayletClient(socket_name, client_id,
static_cast<bool>(PyObject_IsTrue(is_worker)),
driver_id, Language::PYTHON);
return 0;
}
static void PyLocalSchedulerClient_dealloc(PyLocalSchedulerClient *self) {
if (self->local_scheduler_connection != NULL) {
LocalSchedulerConnection_free(self->local_scheduler_connection);
static void PyRayletClient_dealloc(PyRayletClient *self) {
if (self->raylet_client != NULL) {
delete self->raylet_client;
}
Py_TYPE(self)->tp_free((PyObject *)self);
}
static PyObject *PyLocalSchedulerClient_disconnect(PyObject *self) {
local_scheduler_disconnect_client(
((PyLocalSchedulerClient *)self)->local_scheduler_connection);
static PyObject *PyRayletClient_Disconnect(PyRayletClient *self) {
auto status = self->raylet_client->Disconnect();
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to disconnect.");
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_submit(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_SubmitTask(PyRayletClient *self, PyObject *args) {
PyObject *py_task;
if (!PyArg_ParseTuple(args, "O", &py_task)) {
return NULL;
}
LocalSchedulerConnection *connection =
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection;
PyTask *task = reinterpret_cast<PyTask *>(py_task);
local_scheduler_submit_raylet(connection, *task->execution_dependencies,
*task->task_spec);
auto status =
self->raylet_client->SubmitTask(*task->execution_dependencies, *task->task_spec);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to submit a task to raylet.");
Py_RETURN_NONE;
}
// clang-format off
static PyObject *PyLocalSchedulerClient_get_task(PyObject *self) {
ray::raylet::TaskSpecification *task_spec;
static PyObject *PyRayletClient_GetTask(PyRayletClient *self) {
std::unique_ptr<ray::raylet::TaskSpecification> task_spec;
/* Drop the global interpreter lock while we get a task because
* local_scheduler_get_task may block for a long time. */
* raylet_GetTask may block for a long time. */
Py_BEGIN_ALLOW_THREADS
task_spec = local_scheduler_get_task_raylet(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection);
auto status = self->raylet_client->GetTask(&task_spec);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to get a task from raylet.");
Py_END_ALLOW_THREADS
return PyTask_make(task_spec);
return PyTask_make(task_spec.release());
}
// clang-format on
static PyObject *PyLocalSchedulerClient_fetch_or_reconstruct(PyObject *self,
PyObject *args) {
static PyObject *PyRayletClient_FetchOrReconstruct(PyRayletClient *self, PyObject *args) {
PyObject *py_object_ids;
PyObject *py_fetch_only;
std::vector<ObjectID> object_ids;
@ -93,32 +88,31 @@ static PyObject *PyLocalSchedulerClient_fetch_or_reconstruct(PyObject *self,
}
object_ids.push_back(object_id);
}
int ret = local_scheduler_fetch_or_reconstruct(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection,
object_ids, fetch_only, current_task_id);
if (ret == 0) {
auto status =
self->raylet_client->FetchOrReconstruct(object_ids, fetch_only, current_task_id);
if (status.ok()) {
Py_RETURN_NONE;
} else {
std::ostringstream stream;
stream << "local_scheduler_fetch_or_reconstruct failed: "
<< "local scheduler connection may be closed, "
<< "check raylet status. return value: " << ret;
stream << "[RayletClient] FetchOrReconstruct failed: "
<< "raylet client may be closed, check raylet status. error message: "
<< status.ToString();
PyErr_SetString(CommonError, stream.str().c_str());
Py_RETURN_NONE;
}
}
static PyObject *PyLocalSchedulerClient_notify_unblocked(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_NotifyUnblocked(PyRayletClient *self, PyObject *args) {
TaskID current_task_id;
if (!PyArg_ParseTuple(args, "O&", &PyObjectToUniqueID, &current_task_id)) {
return NULL;
}
local_scheduler_notify_unblocked(
((PyLocalSchedulerClient *)self)->local_scheduler_connection, current_task_id);
auto status = self->raylet_client->NotifyUnblocked(current_task_id);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to notify unblocked.");
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_compute_put_id(PyObject *self, PyObject *args) {
int put_index;
TaskID task_id;
if (!PyArg_ParseTuple(args, "O&i", &PyObjectToUniqueID, &task_id, &put_index)) {
@ -128,25 +122,10 @@ static PyObject *PyLocalSchedulerClient_compute_put_id(PyObject *self, PyObject
return PyObjectID_make(put_id);
}
static PyObject *PyLocalSchedulerClient_gpu_ids(PyObject *self) {
/* Construct a Python list of GPU IDs. */
std::vector<int> gpu_ids =
((PyLocalSchedulerClient *)self)->local_scheduler_connection->gpu_ids;
int num_gpu_ids = gpu_ids.size();
PyObject *gpu_ids_list = PyList_New((Py_ssize_t)num_gpu_ids);
for (int i = 0; i < num_gpu_ids; ++i) {
PyList_SetItem(gpu_ids_list, i, PyLong_FromLong(gpu_ids[i]));
}
return gpu_ids_list;
}
// NOTE(rkn): This function only makes sense for the raylet code path.
static PyObject *PyLocalSchedulerClient_resource_ids(PyObject *self) {
static PyObject *PyRayletClient_resource_ids(PyRayletClient *self) {
// Construct a Python dictionary of resource IDs and resource fractions.
PyObject *resource_ids = PyDict_New();
for (auto const &resource_info : reinterpret_cast<PyLocalSchedulerClient *>(self)
->local_scheduler_connection->resource_ids_) {
for (auto const &resource_info : self->raylet_client->GetResourceIDs()) {
auto const &resource_name = resource_info.first;
auto const &ids_and_fractions = resource_info.second;
@ -171,7 +150,7 @@ static PyObject *PyLocalSchedulerClient_resource_ids(PyObject *self) {
return resource_ids;
}
static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_Wait(PyRayletClient *self, PyObject *args) {
PyObject *py_object_ids;
int num_returns;
int64_t timeout_ms;
@ -205,9 +184,10 @@ static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) {
}
// Invoke wait.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> result = local_scheduler_wait(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection,
object_ids, num_returns, timeout_ms, wait_local, current_task_id);
WaitResultPair result;
auto status = self->raylet_client->Wait(object_ids, num_returns, timeout_ms, wait_local,
current_task_id, &result);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to wait for objects.");
// Convert result to py object.
PyObject *py_found = PyList_New(static_cast<Py_ssize_t>(result.first.size()));
@ -221,7 +201,7 @@ static PyObject *PyLocalSchedulerClient_wait(PyObject *self, PyObject *args) {
return Py_BuildValue("(OO)", py_found, py_remaining);
}
static PyObject *PyLocalSchedulerClient_push_error(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_PushError(PyRayletClient *self, PyObject *args) {
JobID job_id;
const char *type;
int type_length;
@ -234,11 +214,10 @@ static PyObject *PyLocalSchedulerClient_push_error(PyObject *self, PyObject *arg
return NULL;
}
local_scheduler_push_error(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection,
auto status = self->raylet_client->PushError(
job_id, std::string(type, type_length),
std::string(error_message, error_message_length), timestamp);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to push errors to raylet.");
Py_RETURN_NONE;
}
@ -258,8 +237,7 @@ int PyBytes_or_PyUnicode_to_string(PyObject *py_string, std::string &out) {
return 0;
}
static PyObject *PyLocalSchedulerClient_push_profile_events(PyObject *self,
PyObject *args) {
static PyObject *PyRayletClient_PushProfileEvents(PyRayletClient *self, PyObject *args) {
const char *component_type;
int component_type_length;
UniqueID component_id;
@ -331,14 +309,12 @@ static PyObject *PyLocalSchedulerClient_push_profile_events(PyObject *self,
profile_info.profile_events.emplace_back(new ProfileEventT(profile_event));
}
local_scheduler_push_profile_events(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection,
profile_info);
auto status = self->raylet_client->PushProfileEvents(profile_info);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to push profile events to raylet.");
Py_RETURN_NONE;
}
static PyObject *PyLocalSchedulerClient_free(PyObject *self, PyObject *args) {
static PyObject *PyRayletClient_FreeObjects(PyRayletClient *self, PyObject *args) {
PyObject *py_object_ids;
PyObject *py_local_only;
@ -367,83 +343,80 @@ static PyObject *PyLocalSchedulerClient_free(PyObject *self, PyObject *args) {
object_ids.push_back(object_id);
}
// Invoke local_scheduler_free_objects_in_object_store.
local_scheduler_free_objects_in_object_store(
reinterpret_cast<PyLocalSchedulerClient *>(self)->local_scheduler_connection,
object_ids, local_only);
// Invoke raylet_FreeObjects.
auto status = self->raylet_client->FreeObjects(object_ids, local_only);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Failed to free objects.");
Py_RETURN_NONE;
}
static PyMethodDef PyLocalSchedulerClient_methods[] = {
{"disconnect", (PyCFunction)PyLocalSchedulerClient_disconnect, METH_NOARGS,
static PyMethodDef PyRayletClient_methods[] = {
{"disconnect", (PyCFunction)PyRayletClient_Disconnect, METH_NOARGS,
"Notify the local scheduler that this client is exiting gracefully."},
{"submit", (PyCFunction)PyLocalSchedulerClient_submit, METH_VARARGS,
{"submit_task", (PyCFunction)PyRayletClient_SubmitTask, METH_VARARGS,
"Submit a task to the local scheduler."},
{"get_task", (PyCFunction)PyLocalSchedulerClient_get_task, METH_NOARGS,
{"get_task", (PyCFunction)PyRayletClient_GetTask, METH_NOARGS,
"Get a task from the local scheduler."},
{"fetch_or_reconstruct", (PyCFunction)PyLocalSchedulerClient_fetch_or_reconstruct,
METH_VARARGS, "Ask the local scheduler to reconstruct an object."},
{"notify_unblocked", (PyCFunction)PyLocalSchedulerClient_notify_unblocked,
METH_VARARGS, "Notify the local scheduler that we are unblocked."},
{"compute_put_id", (PyCFunction)PyLocalSchedulerClient_compute_put_id, METH_VARARGS,
{"fetch_or_reconstruct", (PyCFunction)PyRayletClient_FetchOrReconstruct, METH_VARARGS,
"Ask the local scheduler to reconstruct an object."},
{"notify_unblocked", (PyCFunction)PyRayletClient_NotifyUnblocked, METH_VARARGS,
"Notify the local scheduler that we are unblocked."},
{"compute_put_id", (PyCFunction)PyRayletClient_compute_put_id, METH_VARARGS,
"Return the object ID for a put call within a task."},
{"gpu_ids", (PyCFunction)PyLocalSchedulerClient_gpu_ids, METH_NOARGS,
"Get the IDs of the GPUs that are reserved for this client."},
{"resource_ids", (PyCFunction)PyLocalSchedulerClient_resource_ids, METH_NOARGS,
{"resource_ids", (PyCFunction)PyRayletClient_resource_ids, METH_NOARGS,
"Get the IDs of the resources that are reserved for this client."},
{"wait", (PyCFunction)PyLocalSchedulerClient_wait, METH_VARARGS,
{"wait", (PyCFunction)PyRayletClient_Wait, METH_VARARGS,
"Wait for a list of objects to be created."},
{"push_error", (PyCFunction)PyLocalSchedulerClient_push_error, METH_VARARGS,
{"push_error", (PyCFunction)PyRayletClient_PushError, METH_VARARGS,
"Push an error message to the relevant driver."},
{"push_profile_events", (PyCFunction)PyLocalSchedulerClient_push_profile_events,
METH_VARARGS, "Store some profiling events in the GCS."},
{"free", (PyCFunction)PyLocalSchedulerClient_free, METH_VARARGS,
{"push_profile_events", (PyCFunction)PyRayletClient_PushProfileEvents, METH_VARARGS,
"Store some profiling events in the GCS."},
{"free_objects", (PyCFunction)PyRayletClient_FreeObjects, METH_VARARGS,
"Free a list of objects from object stores."},
{NULL} /* Sentinel */
};
static PyTypeObject PyLocalSchedulerClientType = {
PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */
"local_scheduler.LocalSchedulerClient", /* tp_name */
sizeof(PyLocalSchedulerClient), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)PyLocalSchedulerClient_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"LocalSchedulerClient object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyLocalSchedulerClient_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)PyLocalSchedulerClient_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
static PyTypeObject PyRayletClientType = {
PyVarObject_HEAD_INIT(NULL, 0) /* ob_size */
"raylet.RayletClient", /* tp_name */
sizeof(PyRayletClient), /* tp_basicsize */
0, /* tp_itemsize */
(destructor)PyRayletClient_dealloc, /* tp_dealloc */
0, /* tp_print */
0, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
Py_TPFLAGS_DEFAULT, /* tp_flags */
"RayletClient object", /* tp_doc */
0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
0, /* tp_iter */
0, /* tp_iternext */
PyRayletClient_methods, /* tp_methods */
0, /* tp_members */
0, /* tp_getset */
0, /* tp_base */
0, /* tp_dict */
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
(initproc)PyRayletClient_init, /* tp_init */
0, /* tp_alloc */
PyType_GenericNew, /* tp_new */
};
static PyMethodDef local_scheduler_methods[] = {
static PyMethodDef raylet_methods[] = {
{"check_simple_value", check_simple_value, METH_VARARGS,
"Should the object be passed by value?"},
{"compute_task_id", compute_task_id, METH_VARARGS,
@ -459,14 +432,14 @@ static PyMethodDef local_scheduler_methods[] = {
#if PY_MAJOR_VERSION >= 3
static struct PyModuleDef moduledef = {
PyModuleDef_HEAD_INIT,
"liblocal_scheduler", /* m_name */
"A module for the local scheduler.", /* m_doc */
0, /* m_size */
local_scheduler_methods, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
"libraylet", /* m_name */
"A module for the raylet.", /* m_doc */
0, /* m_size */
raylet_methods, /* m_methods */
NULL, /* m_reload */
NULL, /* m_traverse */
NULL, /* m_clear */
NULL, /* m_free */
};
#endif
@ -486,7 +459,7 @@ static struct PyModuleDef moduledef = {
#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
#endif
MOD_INIT(liblocal_scheduler_library_python) {
MOD_INIT(libraylet_library_python) {
if (PyType_Ready(&PyTaskType) < 0) {
INITERROR;
}
@ -495,7 +468,7 @@ MOD_INIT(liblocal_scheduler_library_python) {
INITERROR;
}
if (PyType_Ready(&PyLocalSchedulerClientType) < 0) {
if (PyType_Ready(&PyRayletClientType) < 0) {
INITERROR;
}
@ -506,9 +479,8 @@ MOD_INIT(liblocal_scheduler_library_python) {
#if PY_MAJOR_VERSION >= 3
PyObject *m = PyModule_Create(&moduledef);
#else
PyObject *m =
Py_InitModule3("liblocal_scheduler_library_python", local_scheduler_methods,
"A module for the local scheduler.");
PyObject *m = Py_InitModule3("libraylet_library_python", raylet_methods,
"A module for the raylet.");
#endif
init_numpy_module();
@ -520,8 +492,8 @@ MOD_INIT(liblocal_scheduler_library_python) {
Py_INCREF(&PyObjectIDType);
PyModule_AddObject(m, "ObjectID", (PyObject *)&PyObjectIDType);
Py_INCREF(&PyLocalSchedulerClientType);
PyModule_AddObject(m, "LocalSchedulerClient", (PyObject *)&PyLocalSchedulerClientType);
Py_INCREF(&PyRayletClientType);
PyModule_AddObject(m, "RayletClient", (PyObject *)&PyRayletClientType);
char common_error[] = "common.error";
CommonError = PyErr_NewException(common_error, NULL, NULL);

View file

@ -1,414 +0,0 @@
#include "local_scheduler_client.h"
#include <inttypes.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include "ray/common/common_protocol.h"
#include "ray/ray_config.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/task_spec.h"
#include "ray/util/logging.h"
using MessageType = ray::protocol::MessageType;
// TODO(rkn): The io methods below should be removed.
int connect_ipc_sock(const char *socket_pathname) {
struct sockaddr_un socket_address;
int socket_fd;
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd < 0) {
RAY_LOG(ERROR) << "socket() failed for pathname " << socket_pathname;
return -1;
}
memset(&socket_address, 0, sizeof(socket_address));
socket_address.sun_family = AF_UNIX;
if (strlen(socket_pathname) + 1 > sizeof(socket_address.sun_path)) {
RAY_LOG(ERROR) << "Socket pathname is too long.";
return -1;
}
strncpy(socket_address.sun_path, socket_pathname, strlen(socket_pathname) + 1);
if (connect(socket_fd, (struct sockaddr *)&socket_address, sizeof(socket_address)) !=
0) {
close(socket_fd);
return -1;
}
return socket_fd;
}
int connect_ipc_sock_retry(const char *socket_pathname, int num_retries,
int64_t timeout) {
/* Pick the default values if the user did not specify. */
if (num_retries < 0) {
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = RayConfig::instance().connect_timeout_milliseconds();
}
RAY_CHECK(socket_pathname);
int fd = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
fd = connect_ipc_sock(socket_pathname);
if (fd >= 0) {
break;
}
if (num_attempts > 0) {
RAY_LOG(ERROR) << "Retrying to connect to socket for pathname " << socket_pathname
<< " (num_attempts = " << num_attempts
<< ", num_retries = " << num_retries << ")";
}
/* Sleep for timeout milliseconds. */
usleep(timeout * 1000);
}
/* If we could not connect to the socket, exit. */
if (fd == -1) {
RAY_LOG(FATAL) << "Could not connect to socket " << socket_pathname;
}
return fd;
}
int read_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
/* Termination condition: EOF or read 'length' bytes total. */
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
nbytes = read(fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
return -1; /* Errno will be set. */
} else if (0 == nbytes) {
/* Encountered early EOF. */
return -1;
}
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
return 0;
}
void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
int64_t version;
int closed = read_bytes(fd, (uint8_t *)&version, sizeof(version));
if (closed) {
goto disconnected;
}
RAY_CHECK(version == RayConfig::instance().ray_protocol_version());
closed = read_bytes(fd, (uint8_t *)type, sizeof(*type));
if (closed) {
goto disconnected;
}
closed = read_bytes(fd, (uint8_t *)length, sizeof(*length));
if (closed) {
goto disconnected;
}
*bytes = (uint8_t *)malloc(*length * sizeof(uint8_t));
closed = read_bytes(fd, *bytes, *length);
if (closed) {
free(*bytes);
goto disconnected;
}
return;
disconnected:
/* Handle the case in which the socket is closed. */
*type = static_cast<int64_t>(MessageType::DisconnectClient);
*length = 0;
*bytes = NULL;
return;
}
int write_bytes(int fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
/* While we haven't written the whole message, write to the file
* descriptor, advance the cursor, and decrease the amount left to write. */
nbytes = write(fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
return -1; /* Errno will be set. */
} else if (0 == nbytes) {
/* Encountered early EOF. */
return -1;
}
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
return 0;
}
int do_write_message(int fd, int64_t type, int64_t length, uint8_t *bytes) {
int64_t version = RayConfig::instance().ray_protocol_version();
int closed;
closed = write_bytes(fd, (uint8_t *)&version, sizeof(version));
if (closed) {
return closed;
}
closed = write_bytes(fd, (uint8_t *)&type, sizeof(type));
if (closed) {
return closed;
}
closed = write_bytes(fd, (uint8_t *)&length, sizeof(length));
if (closed) {
return closed;
}
closed = write_bytes(fd, bytes, length * sizeof(char));
if (closed) {
return closed;
}
return 0;
}
int write_message(int fd, int64_t type, int64_t length, uint8_t *bytes,
std::mutex *mutex) {
if (mutex != NULL) {
std::unique_lock<std::mutex> guard(*mutex);
return do_write_message(fd, type, length, bytes);
} else {
return do_write_message(fd, type, length, bytes);
}
}
LocalSchedulerConnection *LocalSchedulerConnection_init(
const char *local_scheduler_socket, const UniqueID &client_id, bool is_worker,
const JobID &driver_id, const Language &language) {
LocalSchedulerConnection *result = new LocalSchedulerConnection();
result->conn = connect_ipc_sock_retry(local_scheduler_socket, -1, -1);
/* Register with the local scheduler.
* NOTE(swang): If the local scheduler exits and we are registered as a
* worker, we will get killed. */
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateRegisterClientRequest(
fbb, is_worker, to_flatbuf(fbb, client_id), getpid(), to_flatbuf(fbb, driver_id),
language);
fbb.Finish(message);
/* Register the process ID with the local scheduler. */
int success = write_message(
result->conn, static_cast<int64_t>(MessageType::RegisterClientRequest),
fbb.GetSize(), fbb.GetBufferPointer(), &result->write_mutex);
RAY_CHECK(success == 0) << "Unable to register worker with local scheduler";
return result;
}
void LocalSchedulerConnection_free(LocalSchedulerConnection *conn) {
close(conn->conn);
delete conn;
}
void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateDisconnectClient(fbb);
fbb.Finish(message);
write_message(conn->conn,
static_cast<int64_t>(MessageType::IntentionalDisconnectClient),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
void local_scheduler_submit_raylet(LocalSchedulerConnection *conn,
const std::vector<ObjectID> &execution_dependencies,
const ray::raylet::TaskSpecification &task_spec) {
flatbuffers::FlatBufferBuilder fbb;
auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies);
auto message = ray::protocol::CreateSubmitTaskRequest(
fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb));
fbb.Finish(message);
write_message(conn->conn, static_cast<int64_t>(MessageType::SubmitTask), fbb.GetSize(),
fbb.GetBufferPointer(), &conn->write_mutex);
}
ray::raylet::TaskSpecification *local_scheduler_get_task_raylet(
LocalSchedulerConnection *conn) {
int64_t type;
int64_t reply_size;
uint8_t *reply;
{
std::unique_lock<std::mutex> guard(conn->mutex);
write_message(conn->conn, static_cast<int64_t>(MessageType::GetTask), 0, NULL,
&conn->write_mutex);
// Receive a task from the local scheduler. This will block until the local
// scheduler gives this client a task.
read_message(conn->conn, &type, &reply_size, &reply);
}
if (type == static_cast<int64_t>(MessageType::DisconnectClient)) {
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
exit(1);
}
if (type != static_cast<int64_t>(MessageType::ExecuteTask)) {
RAY_LOG(FATAL) << "Problem communicating with raylet from worker: check logs or "
"dmesg for previous errors.";
}
// Parse the flatbuffer object.
auto reply_message = flatbuffers::GetRoot<ray::protocol::GetTaskReply>(reply);
// Set the resource IDs for this task.
conn->resource_ids_.clear();
for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); ++i) {
auto const &fractional_resource_ids =
reply_message->fractional_resource_ids()->Get(i);
auto &acquired_resources = conn->resource_ids_[string_from_flatbuf(
*fractional_resource_ids->resource_name())];
size_t num_resource_ids = fractional_resource_ids->resource_ids()->size();
size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size();
RAY_CHECK(num_resource_ids == num_resource_fractions);
RAY_CHECK(num_resource_ids > 0);
for (size_t j = 0; j < num_resource_ids; ++j) {
int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j);
double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j);
if (num_resource_ids > 1) {
int64_t whole_fraction = resource_fraction;
RAY_CHECK(whole_fraction == resource_fraction);
}
acquired_resources.push_back(std::make_pair(resource_id, resource_fraction));
}
}
ray::raylet::TaskSpecification *task_spec = new ray::raylet::TaskSpecification(
string_from_flatbuf(*reply_message->task_spec()));
// Free the original message from the local scheduler.
free(reply);
// Return the copy of the task spec and pass ownership to the caller.
return task_spec;
}
void local_scheduler_task_done(LocalSchedulerConnection *conn) {
write_message(conn->conn, static_cast<int64_t>(MessageType::TaskDone), 0, NULL,
&conn->write_mutex);
}
int local_scheduler_fetch_or_reconstruct(LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
bool fetch_only, const TaskID &current_task_id) {
flatbuffers::FlatBufferBuilder fbb;
auto object_ids_message = to_flatbuf(fbb, object_ids);
auto message = ray::protocol::CreateFetchOrReconstruct(
fbb, object_ids_message, fetch_only, to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
return write_message(conn->conn, static_cast<int64_t>(MessageType::FetchOrReconstruct),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn,
const TaskID &current_task_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
ray::protocol::CreateNotifyUnblocked(fbb, to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
write_message(conn->conn, static_cast<int64_t>(MessageType::NotifyUnblocked),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
LocalSchedulerConnection *conn, const std::vector<ObjectID> &object_ids,
int num_returns, int64_t timeout_milliseconds, bool wait_local,
const TaskID &current_task_id) {
// Write request.
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateWaitRequest(
fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds, wait_local,
to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
int64_t type;
int64_t reply_size;
uint8_t *reply;
{
std::unique_lock<std::mutex> guard(conn->mutex);
write_message(conn->conn,
static_cast<int64_t>(ray::protocol::MessageType::WaitRequest),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
// Read result.
read_message(conn->conn, &type, &reply_size, &reply);
}
if (static_cast<ray::protocol::MessageType>(type) !=
ray::protocol::MessageType::WaitReply) {
RAY_LOG(FATAL) << "Problem communicating with raylet from worker: check logs or "
"dmesg for previous errors.";
}
auto reply_message = flatbuffers::GetRoot<ray::protocol::WaitReply>(reply);
// Convert result.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> result;
auto found = reply_message->found();
for (uint i = 0; i < found->size(); i++) {
ObjectID object_id = ObjectID::from_binary(found->Get(i)->str());
result.first.push_back(object_id);
}
auto remaining = reply_message->remaining();
for (uint i = 0; i < remaining->size(); i++) {
ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str());
result.second.push_back(object_id);
}
/* Free the original message from the local scheduler. */
free(reply);
return result;
}
void local_scheduler_push_error(LocalSchedulerConnection *conn, const JobID &job_id,
const std::string &type, const std::string &error_message,
double timestamp) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreatePushErrorRequest(
fbb, to_flatbuf(fbb, job_id), fbb.CreateString(type),
fbb.CreateString(error_message), timestamp);
fbb.Finish(message);
write_message(conn->conn,
static_cast<int64_t>(ray::protocol::MessageType::PushErrorRequest),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
void local_scheduler_push_profile_events(LocalSchedulerConnection *conn,
const ProfileTableDataT &profile_events) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateProfileTableData(fbb, &profile_events);
fbb.Finish(message);
write_message(conn->conn, static_cast<int64_t>(
ray::protocol::MessageType::PushProfileEventsRequest),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
}
void local_scheduler_free_objects_in_object_store(
LocalSchedulerConnection *conn, const std::vector<ray::ObjectID> &object_ids,
bool local_only) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateFreeObjectsRequest(fbb, local_only,
to_flatbuf(fbb, object_ids));
fbb.Finish(message);
int success = write_message(
conn->conn,
static_cast<int64_t>(ray::protocol::MessageType::FreeObjectsInObjectStoreRequest),
fbb.GetSize(), fbb.GetBufferPointer(), &conn->write_mutex);
RAY_CHECK(success == 0) << "Failed to write message to raylet.";
}

View file

@ -1,185 +0,0 @@
#ifndef LOCAL_SCHEDULER_CLIENT_H
#define LOCAL_SCHEDULER_CLIENT_H
#include <mutex>
#include "ray/raylet/task_spec.h"
using ray::ObjectID;
using ray::JobID;
using ray::TaskID;
using ray::ActorID;
using ray::UniqueID;
struct LocalSchedulerConnection {
/** File descriptor of the Unix domain socket that connects to local
* scheduler. */
int conn;
/** The IDs of the GPUs that this client can use. NOTE(rkn): This is only used
* by legacy Ray and will be deprecated. */
std::vector<int> gpu_ids;
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction
/// of that resource allocated for this worker.
std::unordered_map<std::string, std::vector<std::pair<int64_t, double>>> resource_ids_;
/// A mutex to protect stateful operations of the local scheduler client.
std::mutex mutex;
/// A mutext to protect write operations of the local scheduler client.
std::mutex write_mutex;
};
/**
* Connect to the local scheduler.
*
* @param local_scheduler_socket The name of the socket to use to connect to the
* local scheduler.
* @param worker_id A unique ID to represent the worker.
* @param is_worker Whether this client is a worker. If it is a worker, an
* additional message will be sent to register as one.
* @param driver_id The ID of the driver. This is non-nil if the client is a
* driver.
* @return The connection information.
*/
LocalSchedulerConnection *LocalSchedulerConnection_init(
const char *local_scheduler_socket, const UniqueID &worker_id, bool is_worker,
const JobID &driver_id, const Language &language);
/**
* Disconnect from the local scheduler.
*
* @param conn Local scheduler connection information returned by
* LocalSchedulerConnection_init.
* @return Void.
*/
void LocalSchedulerConnection_free(LocalSchedulerConnection *conn);
/// Submit a task using the raylet code path.
///
/// \param The connection information.
/// \param The execution dependencies.
/// \param The task specification.
/// \return Void.
void local_scheduler_submit_raylet(LocalSchedulerConnection *conn,
const std::vector<ObjectID> &execution_dependencies,
const ray::raylet::TaskSpecification &task_spec);
/**
* Notify the local scheduler that this client is disconnecting gracefully. This
* is used by actors to exit gracefully so that the local scheduler doesn't
* propagate an error message to the driver.
*
* @param conn The connection information.
* @return Void.
*/
void local_scheduler_disconnect_client(LocalSchedulerConnection *conn);
/// Get next task for this client. This will block until the scheduler assigns
/// a task to this worker. The caller takes ownership of the returned task
/// specification and must free it.
///
/// \param conn The connection information.
/// \return The assigned task.
ray::raylet::TaskSpecification *local_scheduler_get_task_raylet(
LocalSchedulerConnection *conn);
/**
* Tell the local scheduler that the client has finished executing a task.
*
* @param conn The connection information.
* @return Void.
*/
void local_scheduler_task_done(LocalSchedulerConnection *conn);
/**
* Tell the local scheduler to reconstruct or fetch objects.
*
* @param conn The connection information.
* @param object_ids The IDs of the objects to reconstruct.
* @param fetch_only Only fetch objects, do not reconstruct them.
* @param current_task_id The task that needs the objects.
* @return int 0 means correct, other numbers mean error.
*/
int local_scheduler_fetch_or_reconstruct(LocalSchedulerConnection *conn,
const std::vector<ObjectID> &object_ids,
bool fetch_only, const TaskID &current_task_id);
/**
* Notify the local scheduler that this client (worker) is no longer blocked.
*
* @param conn The connection information.
* @param current_task_id The task that is no longer blocked.
* @return Void.
*/
void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn,
const TaskID &current_task_id);
// /**
// * Get an actor's current task frontier.
// *
// * @param conn The connection information.
// * @param actor_id The ID of the actor whose frontier is returned.
// * @return A byte vector that can be traversed as an ActorFrontier flatbuffer.
// */
// const std::vector<uint8_t> local_scheduler_get_actor_frontier(
// LocalSchedulerConnection *conn,
// ActorID actor_id);
// /**
// * Set an actor's current task frontier.
// *
// * @param conn The connection information.
// * @param frontier An ActorFrontier flatbuffer to set the frontier to.
// * @return Void.
// */
// void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn,
// const std::vector<uint8_t> &frontier);
/// Wait for the given objects until timeout expires or num_return objects are
/// found.
///
/// \param conn The connection information.
/// \param object_ids The objects to wait for.
/// \param num_returns The number of objects to wait for.
/// \param timeout_milliseconds Duration, in milliseconds, to wait before
/// returning.
/// \param wait_local Whether to wait for objects to appear on this node.
/// \param current_task_id The task that called wait.
/// \return A pair with the first element containing the object ids that were
/// found, and the second element the objects that were not found.
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
LocalSchedulerConnection *conn, const std::vector<ObjectID> &object_ids,
int num_returns, int64_t timeout_milliseconds, bool wait_local,
const TaskID &current_task_id);
/// Push an error to the relevant driver.
///
/// \param conn The connection information.
/// \param The ID of the job that the error is for.
/// \param The type of the error.
/// \param The error message.
/// \param The timestamp of the error.
/// \return Void.
void local_scheduler_push_error(LocalSchedulerConnection *conn, const JobID &job_id,
const std::string &type, const std::string &error_message,
double timestamp);
/// Store some profile events in the GCS.
///
/// \param conn The connection information.
/// \param profile_events A batch of profiling event information.
/// \return Void.
void local_scheduler_push_profile_events(LocalSchedulerConnection *conn,
const ProfileTableDataT &profile_events);
/// Free a list of objects from object stores.
///
/// \param conn The connection information.
/// \param object_ids A list of ObjectsIDs to be deleted.
/// \param local_only Whether keep this request with local object store
/// or send it to all the object stores.
/// \return Void.
void local_scheduler_free_objects_in_object_store(
LocalSchedulerConnection *conn, const std::vector<ray::ObjectID> &object_ids,
bool local_only);
#endif

View file

@ -0,0 +1,360 @@
#include "raylet_client.h"
#include <inttypes.h>
#include <netdb.h>
#include <netinet/in.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include "ray/common/common_protocol.h"
#include "ray/ray_config.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/task_spec.h"
#include "ray/util/logging.h"
using MessageType = ray::protocol::MessageType;
// TODO(rkn): The io methods below should be removed.
int connect_ipc_sock(const std::string &socket_pathname) {
struct sockaddr_un socket_address;
int socket_fd;
socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd < 0) {
RAY_LOG(ERROR) << "socket() failed for pathname " << socket_pathname;
return -1;
}
memset(&socket_address, 0, sizeof(socket_address));
socket_address.sun_family = AF_UNIX;
if (socket_pathname.length() + 1 > sizeof(socket_address.sun_path)) {
RAY_LOG(ERROR) << "Socket pathname is too long.";
close(socket_fd);
return -1;
}
strncpy(socket_address.sun_path, socket_pathname.c_str(), socket_pathname.length() + 1);
if (connect(socket_fd, (struct sockaddr *)&socket_address, sizeof(socket_address)) !=
0) {
close(socket_fd);
return -1;
}
return socket_fd;
}
int read_bytes(int socket_fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
// Termination condition: EOF or read 'length' bytes total.
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
nbytes = read(socket_fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
return -1; // Errno will be set.
} else if (0 == nbytes) {
// Encountered early EOF.
return -1;
}
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
return 0;
}
int write_bytes(int socket_fd, uint8_t *cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
// While we haven't written the whole message, write to the file
// descriptor, advance the cursor, and decrease the amount left to write.
nbytes = write(socket_fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
continue;
}
return -1; // Errno will be set.
} else if (0 == nbytes) {
// Encountered early EOF.
return -1;
}
RAY_CHECK(nbytes > 0);
bytesleft -= nbytes;
offset += nbytes;
}
return 0;
}
RayletConnection::RayletConnection(const std::string &raylet_socket, int num_retries,
int64_t timeout) {
// Pick the default values if the user did not specify.
if (num_retries < 0) {
num_retries = RayConfig::instance().num_connect_attempts();
}
if (timeout < 0) {
timeout = RayConfig::instance().connect_timeout_milliseconds();
}
RAY_CHECK(!raylet_socket.empty());
conn_ = -1;
for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
conn_ = connect_ipc_sock(raylet_socket);
if (conn_ >= 0) break;
if (num_attempts > 0) {
RAY_LOG(ERROR) << "Retrying to connect to socket for pathname " << raylet_socket
<< " (num_attempts = " << num_attempts
<< ", num_retries = " << num_retries << ")";
}
// Sleep for timeout milliseconds.
usleep(timeout * 1000);
}
// If we could not connect to the socket, exit.
if (conn_ == -1) {
RAY_LOG(FATAL) << "Could not connect to socket " << raylet_socket;
}
}
ray::Status RayletConnection::Disconnect() {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateDisconnectClient(fbb);
fbb.Finish(message);
auto status = WriteMessage(MessageType::IntentionalDisconnectClient, &fbb);
// Don't be too strict for disconnection errors.
// Just create logs and prevent it from crash.
if (!status.ok()) {
RAY_LOG(ERROR) << status.ToString()
<< " [RayletClient] Failed to disconnect from raylet.";
}
return ray::Status::OK();
}
ray::Status RayletConnection::ReadMessage(MessageType type,
std::unique_ptr<uint8_t[]> &message) {
int64_t version;
int64_t type_field;
int64_t length;
int closed = read_bytes(conn_, (uint8_t *)&version, sizeof(version));
if (closed) goto disconnected;
RAY_CHECK(version == RayConfig::instance().ray_protocol_version());
closed = read_bytes(conn_, (uint8_t *)&type_field, sizeof(type_field));
if (closed) goto disconnected;
closed = read_bytes(conn_, (uint8_t *)&length, sizeof(length));
if (closed) goto disconnected;
message = std::unique_ptr<uint8_t[]>(new uint8_t[length]);
closed = read_bytes(conn_, message.get(), length);
if (closed) {
// Handle the case in which the socket is closed.
message.reset(nullptr);
disconnected:
message = nullptr;
type_field = static_cast<int64_t>(MessageType::DisconnectClient);
length = 0;
}
if (type_field == static_cast<int64_t>(MessageType::DisconnectClient)) {
return ray::Status::IOError("[RayletClient] Raylet connection closed.");
}
if (type_field != static_cast<int64_t>(type)) {
return ray::Status::TypeError(
std::string("[RayletClient] Raylet connection corrupted. ") +
"Expected message type: " + std::to_string(static_cast<int64_t>(type)) +
"; got message type: " + std::to_string(type_field) +
". Check logs or dmesg for previous errors.");
}
return ray::Status::OK();
}
ray::Status RayletConnection::WriteMessage(MessageType type,
flatbuffers::FlatBufferBuilder *fbb) {
std::unique_lock<std::mutex> guard(write_mutex_);
int64_t version = RayConfig::instance().ray_protocol_version();
int64_t length = fbb ? fbb->GetSize() : 0;
uint8_t *bytes = fbb ? fbb->GetBufferPointer() : nullptr;
int64_t type_field = static_cast<int64_t>(type);
auto io_error = ray::Status::IOError("[RayletClient] Connection closed unexpectedly.");
int closed;
closed = write_bytes(conn_, (uint8_t *)&version, sizeof(version));
if (closed) return io_error;
closed = write_bytes(conn_, (uint8_t *)&type_field, sizeof(type_field));
if (closed) return io_error;
closed = write_bytes(conn_, (uint8_t *)&length, sizeof(length));
if (closed) return io_error;
closed = write_bytes(conn_, bytes, length * sizeof(char));
if (closed) return io_error;
return ray::Status::OK();
}
ray::Status RayletConnection::AtomicRequestReply(
MessageType request_type, MessageType reply_type,
std::unique_ptr<uint8_t[]> &reply_message, flatbuffers::FlatBufferBuilder *fbb) {
std::unique_lock<std::mutex> guard(mutex_);
auto status = WriteMessage(request_type, fbb);
if (!status.ok()) return status;
return ReadMessage(reply_type, reply_message);
}
RayletClient::RayletClient(const std::string &raylet_socket, const UniqueID &client_id,
bool is_worker, const JobID &driver_id,
const Language &language)
: client_id_(client_id),
is_worker_(is_worker),
driver_id_(driver_id),
language_(language) {
// For C++14, we could use std::make_unique
conn_ = std::unique_ptr<RayletConnection>(new RayletConnection(raylet_socket, -1, -1));
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateRegisterClientRequest(
fbb, is_worker, to_flatbuf(fbb, client_id), getpid(), to_flatbuf(fbb, driver_id),
language);
fbb.Finish(message);
// Register the process ID with the raylet.
// NOTE(swang): If raylet exits and we are registered as a worker, we will get killed.
auto status = conn_->WriteMessage(MessageType::RegisterClientRequest, &fbb);
RAY_CHECK_OK_PREPEND(status, "[RayletClient] Unable to register worker with raylet.");
}
ray::Status RayletClient::SubmitTask(const std::vector<ObjectID> &execution_dependencies,
const ray::raylet::TaskSpecification &task_spec) {
flatbuffers::FlatBufferBuilder fbb;
auto execution_dependencies_message = to_flatbuf(fbb, execution_dependencies);
auto message = ray::protocol::CreateSubmitTaskRequest(
fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::SubmitTask, &fbb);
}
ray::Status RayletClient::GetTask(
std::unique_ptr<ray::raylet::TaskSpecification> *task_spec) {
std::unique_ptr<uint8_t[]> reply;
// Receive a task from the raylet. This will block until the local
// scheduler gives this client a task.
auto status =
conn_->AtomicRequestReply(MessageType::GetTask, MessageType::ExecuteTask, reply);
if (!status.ok()) return status;
// Parse the flatbuffer object.
auto reply_message = flatbuffers::GetRoot<ray::protocol::GetTaskReply>(reply.get());
// Set the resource IDs for this task.
resource_ids_.clear();
for (size_t i = 0; i < reply_message->fractional_resource_ids()->size(); ++i) {
auto const &fractional_resource_ids =
reply_message->fractional_resource_ids()->Get(i);
auto &acquired_resources =
resource_ids_[string_from_flatbuf(*fractional_resource_ids->resource_name())];
size_t num_resource_ids = fractional_resource_ids->resource_ids()->size();
size_t num_resource_fractions = fractional_resource_ids->resource_fractions()->size();
RAY_CHECK(num_resource_ids == num_resource_fractions);
RAY_CHECK(num_resource_ids > 0);
for (size_t j = 0; j < num_resource_ids; ++j) {
int64_t resource_id = fractional_resource_ids->resource_ids()->Get(j);
double resource_fraction = fractional_resource_ids->resource_fractions()->Get(j);
if (num_resource_ids > 1) {
int64_t whole_fraction = resource_fraction;
RAY_CHECK(whole_fraction == resource_fraction);
}
acquired_resources.push_back(std::make_pair(resource_id, resource_fraction));
}
}
// Return the copy of the task spec and pass ownership to the caller.
task_spec->reset(new ray::raylet::TaskSpecification(
string_from_flatbuf(*reply_message->task_spec())));
return ray::Status::OK();
}
ray::Status RayletClient::TaskDone() {
return conn_->WriteMessage(MessageType::TaskDone);
}
ray::Status RayletClient::FetchOrReconstruct(const std::vector<ObjectID> &object_ids,
bool fetch_only,
const TaskID &current_task_id) {
flatbuffers::FlatBufferBuilder fbb;
auto object_ids_message = to_flatbuf(fbb, object_ids);
auto message = ray::protocol::CreateFetchOrReconstruct(
fbb, object_ids_message, fetch_only, to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
auto status = conn_->WriteMessage(MessageType::FetchOrReconstruct, &fbb);
return status;
}
ray::Status RayletClient::NotifyUnblocked(const TaskID &current_task_id) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
ray::protocol::CreateNotifyUnblocked(fbb, to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb);
}
ray::Status RayletClient::Wait(const std::vector<ObjectID> &object_ids, int num_returns,
int64_t timeout_milliseconds, bool wait_local,
const TaskID &current_task_id, WaitResultPair *result) {
// Write request.
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateWaitRequest(
fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds, wait_local,
to_flatbuf(fbb, current_task_id));
fbb.Finish(message);
std::unique_ptr<uint8_t[]> reply;
auto status = conn_->AtomicRequestReply(MessageType::WaitRequest,
MessageType::WaitReply, reply, &fbb);
if (!status.ok()) return status;
// Parse the flatbuffer object.
auto reply_message = flatbuffers::GetRoot<ray::protocol::WaitReply>(reply.get());
auto found = reply_message->found();
for (uint i = 0; i < found->size(); i++) {
ObjectID object_id = ObjectID::from_binary(found->Get(i)->str());
result->first.push_back(object_id);
}
auto remaining = reply_message->remaining();
for (uint i = 0; i < remaining->size(); i++) {
ObjectID object_id = ObjectID::from_binary(remaining->Get(i)->str());
result->second.push_back(object_id);
}
return ray::Status::OK();
}
ray::Status RayletClient::PushError(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreatePushErrorRequest(
fbb, to_flatbuf(fbb, job_id), fbb.CreateString(type),
fbb.CreateString(error_message), timestamp);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::PushErrorRequest, &fbb);
}
ray::Status RayletClient::PushProfileEvents(const ProfileTableDataT &profile_events) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateProfileTableData(fbb, &profile_events);
fbb.Finish(message);
auto status = conn_->WriteMessage(MessageType::PushProfileEventsRequest, &fbb);
// Don't be too strict for profile errors. Just create logs and prevent it from crash.
if (!status.ok()) {
RAY_LOG(ERROR) << status.ToString()
<< " [RayletClient] Failed to push profile events.";
}
return ray::Status::OK();
}
ray::Status RayletClient::FreeObjects(const std::vector<ray::ObjectID> &object_ids,
bool local_only) {
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateFreeObjectsRequest(fbb, local_only,
to_flatbuf(fbb, object_ids));
fbb.Finish(message);
auto status = conn_->WriteMessage(MessageType::FreeObjectsInObjectStoreRequest, &fbb);
return status;
}

View file

@ -0,0 +1,171 @@
#ifndef RAYLET_CLIENT_H
#define RAYLET_CLIENT_H
#include <unistd.h>
#include <mutex>
#include <unordered_map>
#include <vector>
#include "ray/raylet/task_spec.h"
#include "ray/status.h"
using ray::ActorID;
using ray::JobID;
using ray::ObjectID;
using ray::TaskID;
using ray::UniqueID;
using MessageType = ray::protocol::MessageType;
using ResourceMappingType =
std::unordered_map<std::string, std::vector<std::pair<int64_t, double>>>;
using WaitResultPair = std::pair<std::vector<ObjectID>, std::vector<ObjectID>>;
class RayletConnection {
public:
/// Connect to the raylet.
///
/// \param raylet_socket The name of the socket to use to connect to the raylet.
/// \param worker_id A unique ID to represent the worker.
/// \param is_worker Whether this client is a worker. If it is a worker, an
/// additional message will be sent to register as one.
/// \param driver_id The ID of the driver. This is non-nil if the client is a
/// driver.
/// \return The connection information.
RayletConnection(const std::string &raylet_socket, int num_retries, int64_t timeout);
~RayletConnection() { close(conn_); }
/// Notify the raylet that this client is disconnecting gracefully. This
/// is used by actors to exit gracefully so that the raylet doesn't
/// propagate an error message to the driver.
///
/// \return ray::Status.
ray::Status Disconnect();
ray::Status ReadMessage(MessageType type, std::unique_ptr<uint8_t[]> &message);
ray::Status WriteMessage(MessageType type,
flatbuffers::FlatBufferBuilder *fbb = nullptr);
ray::Status AtomicRequestReply(MessageType request_type, MessageType reply_type,
std::unique_ptr<uint8_t[]> &reply_message,
flatbuffers::FlatBufferBuilder *fbb = nullptr);
private:
/// File descriptor of the Unix domain socket that connects to raylet.
int conn_;
/// A mutex to protect stateful operations of the raylet client.
std::mutex mutex_;
/// A mutex to protect write operations of the raylet client.
std::mutex write_mutex_;
};
class RayletClient {
public:
/// Connect to the raylet.
///
/// \param raylet_socket The name of the socket to use to connect to the raylet.
/// \param worker_id A unique ID to represent the worker.
/// \param is_worker Whether this client is a worker. If it is a worker, an
/// additional message will be sent to register as one.
/// \param driver_id The ID of the driver. This is non-nil if the client is a driver.
/// \return The connection information.
RayletClient(const std::string &raylet_socket, const UniqueID &client_id,
bool is_worker, const JobID &driver_id, const Language &language);
ray::Status Disconnect() { return conn_->Disconnect(); };
/// Submit a task using the raylet code path.
///
/// \param The execution dependencies.
/// \param The task specification.
/// \return ray::Status.
ray::Status SubmitTask(const std::vector<ObjectID> &execution_dependencies,
const ray::raylet::TaskSpecification &task_spec);
/// Get next task for this client. This will block until the scheduler assigns
/// a task to this worker. The caller takes ownership of the returned task
/// specification and must free it.
///
/// \param task_spec The assigned task.
/// \return ray::Status.
ray::Status GetTask(std::unique_ptr<ray::raylet::TaskSpecification> *task_spec);
/// Tell the raylet that the client has finished executing a task.
///
/// \return ray::Status.
ray::Status TaskDone();
/// Tell the raylet to reconstruct or fetch objects.
///
/// \param object_ids The IDs of the objects to reconstruct.
/// \param fetch_only Only fetch objects, do not reconstruct them.
/// \param current_task_id The task that needs the objects.
/// \return int 0 means correct, other numbers mean error.
ray::Status FetchOrReconstruct(const std::vector<ObjectID> &object_ids, bool fetch_only,
const TaskID &current_task_id);
/// Notify the raylet that this client (worker) is no longer blocked.
///
/// \param current_task_id The task that is no longer blocked.
/// \return ray::Status.
ray::Status NotifyUnblocked(const TaskID &current_task_id);
/// Wait for the given objects until timeout expires or num_return objects are
/// found.
///
/// \param object_ids The objects to wait for.
/// \param num_returns The number of objects to wait for.
/// \param timeout_milliseconds Duration, in milliseconds, to wait before returning.
/// \param wait_local Whether to wait for objects to appear on this node.
/// \param current_task_id The task that called wait.
/// \param result A pair with the first element containing the object ids that were
/// found, and the second element the objects that were not found.
/// \return ray::Status.
ray::Status Wait(const std::vector<ObjectID> &object_ids, int num_returns,
int64_t timeout_milliseconds, bool wait_local,
const TaskID &current_task_id, WaitResultPair *result);
/// Push an error to the relevant driver.
///
/// \param The ID of the job that the error is for.
/// \param The type of the error.
/// \param The error message.
/// \param The timestamp of the error.
/// \return ray::Status.
ray::Status PushError(const JobID &job_id, const std::string &type,
const std::string &error_message, double timestamp);
/// Store some profile events in the GCS.
///
/// \param profile_events A batch of profiling event information.
/// \return ray::Status.
ray::Status PushProfileEvents(const ProfileTableDataT &profile_events);
/// Free a list of objects from object stores.
///
/// \param object_ids A list of ObjectsIDs to be deleted.
/// \param local_only Whether keep this request with local object store
/// or send it to all the object stores.
/// \return ray::Status.
ray::Status FreeObjects(const std::vector<ray::ObjectID> &object_ids, bool local_only);
Language GetLanguage() const { return language_; }
JobID GetClientID() const { return client_id_; }
JobID GetDriverID() const { return driver_id_; }
bool IsWorker() const { return is_worker_; }
const ResourceMappingType &GetResourceIDs() const { return resource_ids_; }
private:
const UniqueID client_id_;
const bool is_worker_;
const JobID driver_id_;
const Language language_;
/// A map from resource name to the resource IDs that are currently reserved
/// for this worker. Each pair consists of the resource ID and the fraction
/// of that resource allocated for this worker.
ResourceMappingType resource_ids_;
/// The connection to the raylet server.
std::unique_ptr<RayletConnection> conn_;
};
#endif

View file

@ -541,8 +541,8 @@ def test_driver_put_errors(ray_start_driver_put_errors):
# were evicted and whose originating tasks are still running, this
# for-loop should hang on its first iteration and push an error to the
# driver.
ray.worker.global_worker.local_scheduler_client.fetch_or_reconstruct(
[args[0]], False)
ray.worker.global_worker.raylet_client.fetch_or_reconstruct([args[0]],
False)
def error_check(errors):
return len(errors) > 1