mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Use scoped enums in C++ and flatbuffers. (#2194)
* Enable --scoped-enums in flatbuffer compiler. * Change enum to c++11 style (enum class). * Resolve conflicts. * Solve building failure when RAY_USE_NEW_GCS=on and remove ERROR_INDEX suffix. * Merge with master and fix CI failure.
This commit is contained in:
parent
f0907a6ee9
commit
0a34bea0b0
42 changed files with 381 additions and 352 deletions
|
@ -24,7 +24,7 @@ add_custom_command(
|
|||
# flatbuffers message Message, which can be used to store deserialized
|
||||
# messages in data structures. This is currently used for ObjectInfo for
|
||||
# example.
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${COMMON_FBS_SRC} --gen-object-api
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${COMMON_FBS_SRC} --gen-object-api --scoped-enums
|
||||
DEPENDS ${FBS_DEPENDS}
|
||||
COMMENT "Running flatc compiler on ${COMMON_FBS_SRC}"
|
||||
VERBATIM)
|
||||
|
|
|
@ -322,7 +322,7 @@ void read_message(int fd, int64_t *type, int64_t *length, uint8_t **bytes) {
|
|||
|
||||
disconnected:
|
||||
/* Handle the case in which the socket is closed. */
|
||||
*type = DISCONNECT_CLIENT;
|
||||
*type = static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT);
|
||||
*length = 0;
|
||||
*bytes = NULL;
|
||||
return;
|
||||
|
@ -382,13 +382,14 @@ int64_t read_vector(int fd, int64_t *type, std::vector<uint8_t> &buffer) {
|
|||
return length;
|
||||
disconnected:
|
||||
/* Handle the case in which the socket is closed. */
|
||||
*type = DISCONNECT_CLIENT;
|
||||
*type = static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void write_log_message(int fd, const char *message) {
|
||||
/* Account for the \0 at the end of the string. */
|
||||
write_message(fd, LOG_MESSAGE, strlen(message) + 1, (uint8_t *) message);
|
||||
write_message(fd, static_cast<int64_t>(CommonMessageType::LOG_MESSAGE),
|
||||
strlen(message) + 1, (uint8_t *) message);
|
||||
}
|
||||
|
||||
char *read_log_message(int fd) {
|
||||
|
@ -396,6 +397,7 @@ char *read_log_message(int fd) {
|
|||
int64_t type;
|
||||
int64_t length;
|
||||
read_message(fd, &type, &length, &bytes);
|
||||
RAY_CHECK(type == LOG_MESSAGE);
|
||||
RAY_CHECK(static_cast<CommonMessageType>(type) ==
|
||||
CommonMessageType::LOG_MESSAGE);
|
||||
return (char *) bytes;
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
struct aeEventLoop;
|
||||
typedef aeEventLoop event_loop;
|
||||
|
||||
enum common_message_type {
|
||||
enum class CommonMessageType : int32_t {
|
||||
/** Disconnect a client. */
|
||||
DISCONNECT_CLIENT,
|
||||
/** Log a message from a client. */
|
||||
|
|
|
@ -76,8 +76,8 @@ TablePubsub ParseTablePubsub(const RedisModuleString *pubsub_channel_str) {
|
|||
pubsub_channel_str, &pubsub_channel_long) == REDISMODULE_OK)
|
||||
<< "Pubsub channel must be a valid TablePubsub";
|
||||
auto pubsub_channel = static_cast<TablePubsub>(pubsub_channel_long);
|
||||
RAY_CHECK(pubsub_channel >= TablePubsub_MIN &&
|
||||
pubsub_channel <= TablePubsub_MAX)
|
||||
RAY_CHECK(pubsub_channel >= TablePubsub::MIN &&
|
||||
pubsub_channel <= TablePubsub::MAX)
|
||||
<< "Pubsub channel must be a valid TablePubsub";
|
||||
return pubsub_channel;
|
||||
}
|
||||
|
@ -90,8 +90,9 @@ RedisModuleString *FormatPubsubChannel(
|
|||
const RedisModuleString *id) {
|
||||
// Format the pubsub channel enum to a string. TablePubsub_MAX should be more
|
||||
// than enough digits, but add 1 just in case for the null terminator.
|
||||
char pubsub_channel[TablePubsub_MAX + 1];
|
||||
sprintf(pubsub_channel, "%d", ParseTablePubsub(pubsub_channel_str));
|
||||
char pubsub_channel[static_cast<int>(TablePubsub::MAX) + 1];
|
||||
sprintf(pubsub_channel, "%d",
|
||||
static_cast<int>(ParseTablePubsub(pubsub_channel_str)));
|
||||
return RedisString_Format(ctx, "%s:%S", pubsub_channel, id);
|
||||
}
|
||||
|
||||
|
@ -123,12 +124,12 @@ RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
|
|||
REDISMODULE_OK)
|
||||
<< "Prefix must be a valid TablePrefix";
|
||||
auto prefix = static_cast<TablePrefix>(prefix_long);
|
||||
RAY_CHECK(prefix != TablePrefix_UNUSED)
|
||||
RAY_CHECK(prefix != TablePrefix::UNUSED)
|
||||
<< "This table has no prefix registered";
|
||||
RAY_CHECK(prefix >= TablePrefix_MIN && prefix <= TablePrefix_MAX)
|
||||
RAY_CHECK(prefix >= TablePrefix::MIN && prefix <= TablePrefix::MAX)
|
||||
<< "Prefix must be a valid TablePrefix";
|
||||
return OpenPrefixedKey(ctx, table_prefixes[prefix], keyname, mode,
|
||||
mutated_key_str);
|
||||
return OpenPrefixedKey(ctx, table_prefixes[static_cast<long long>(prefix)],
|
||||
keyname, mode, mutated_key_str);
|
||||
}
|
||||
|
||||
RedisModuleKey *OpenPrefixedKey(RedisModuleCtx *ctx,
|
||||
|
@ -486,14 +487,15 @@ int PublishTaskTableAdd(RedisModuleCtx *ctx,
|
|||
auto message = flatbuffers::GetRoot<TaskTableData>(buf);
|
||||
RAY_CHECK(message != nullptr);
|
||||
|
||||
if (message->scheduling_state() == SchedulingState_WAITING ||
|
||||
message->scheduling_state() == SchedulingState_SCHEDULED) {
|
||||
if (message->scheduling_state() == SchedulingState::WAITING ||
|
||||
message->scheduling_state() == SchedulingState::SCHEDULED) {
|
||||
/* Build the PUBLISH topic and message for task table subscribers. The
|
||||
* topic
|
||||
* is a string in the format "TASK_PREFIX:<local scheduler ID>:<state>".
|
||||
* The
|
||||
* message is a serialized SubscribeToTasksReply flatbuffer object. */
|
||||
std::string state = std::to_string(message->scheduling_state());
|
||||
std::string state =
|
||||
std::to_string(static_cast<int>(message->scheduling_state()));
|
||||
RedisModuleString *publish_topic = RedisString_Format(
|
||||
ctx, "%s%b:%s", TASK_PREFIX, message->scheduler_id()->str().data(),
|
||||
sizeof(DBClientID), state.c_str());
|
||||
|
@ -501,12 +503,13 @@ int PublishTaskTableAdd(RedisModuleCtx *ctx,
|
|||
/* Construct the flatbuffers object for the payload. */
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
/* Create the flatbuffers message. */
|
||||
auto msg = CreateTaskReply(
|
||||
fbb, RedisStringToFlatbuf(fbb, id), message->scheduling_state(),
|
||||
fbb.CreateString(message->scheduler_id()),
|
||||
fbb.CreateString(message->execution_dependencies()),
|
||||
fbb.CreateString(message->task_info()), message->spillback_count(),
|
||||
true /* not used */);
|
||||
auto msg =
|
||||
CreateTaskReply(fbb, RedisStringToFlatbuf(fbb, id),
|
||||
static_cast<long long>(message->scheduling_state()),
|
||||
fbb.CreateString(message->scheduler_id()),
|
||||
fbb.CreateString(message->execution_dependencies()),
|
||||
fbb.CreateString(message->task_info()),
|
||||
message->spillback_count(), true /* not used */);
|
||||
fbb.Finish(msg);
|
||||
|
||||
RedisModuleString *publish_message = RedisModule_CreateString(
|
||||
|
@ -613,12 +616,12 @@ int TableAdd_DoPublish(RedisModuleCtx *ctx,
|
|||
|
||||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
|
||||
|
||||
if (pubsub_channel == TablePubsub_TASK) {
|
||||
if (pubsub_channel == TablePubsub::TASK) {
|
||||
// Publish the task to its subscribers.
|
||||
// TODO(swang): This is only necessary for legacy Ray and should be removed
|
||||
// once we switch to using the new GCS API for the task table.
|
||||
return PublishTaskTableAdd(ctx, id, data);
|
||||
} else if (pubsub_channel != TablePubsub_NO_PUBLISH) {
|
||||
} else if (pubsub_channel != TablePubsub::NO_PUBLISH) {
|
||||
// All other pubsub channels write the data back directly onto the channel.
|
||||
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
|
||||
} else {
|
||||
|
@ -723,7 +726,7 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx,
|
|||
RAY_CHECK(flags == REDISMODULE_ZADD_ADDED) << "Appended a duplicate entry";
|
||||
// Publish a message on the requested pubsub channel if necessary.
|
||||
TablePubsub pubsub_channel = ParseTablePubsub(pubsub_channel_str);
|
||||
if (pubsub_channel != TablePubsub_NO_PUBLISH) {
|
||||
if (pubsub_channel != TablePubsub::NO_PUBLISH) {
|
||||
// All other pubsub channels write the data back directly onto the
|
||||
// channel.
|
||||
return PublishTableAdd(ctx, pubsub_channel_str, id, data);
|
||||
|
@ -956,7 +959,8 @@ int TableTestAndUpdate_RedisCommand(RedisModuleCtx *ctx,
|
|||
|
||||
auto update = flatbuffers::GetRoot<TaskTableTestAndUpdate>(update_buf);
|
||||
|
||||
bool do_update = data->scheduling_state() & update->test_state_bitmask();
|
||||
bool do_update = static_cast<int>(data->scheduling_state()) &
|
||||
static_cast<int>(update->test_state_bitmask());
|
||||
|
||||
if (!is_nil(update->test_scheduler_id()->str())) {
|
||||
do_update =
|
||||
|
@ -1460,8 +1464,8 @@ int TaskTableWrite(RedisModuleCtx *ctx,
|
|||
"TaskSpec", task_spec, "spillback_count", spillback_count, NULL);
|
||||
}
|
||||
|
||||
if (state_value == TASK_STATUS_WAITING ||
|
||||
state_value == TASK_STATUS_SCHEDULED) {
|
||||
if (static_cast<TaskStatus>(state_value) == TaskStatus::WAITING ||
|
||||
static_cast<TaskStatus>(state_value) == TaskStatus::SCHEDULED) {
|
||||
/* Build the PUBLISH topic and message for task table subscribers. The
|
||||
* topic is a string in the format
|
||||
* "TASK_PREFIX:<local scheduler ID>:<state>". The message is a serialized
|
||||
|
|
|
@ -6,7 +6,7 @@ const char *error_types[] = {"object_hash_mismatch", "put_reconstruction",
|
|||
|
||||
void push_error(DBHandle *db_handle,
|
||||
DBClientID driver_id,
|
||||
int error_type,
|
||||
ErrorIndex error_type,
|
||||
const std::string &error_message) {
|
||||
int64_t message_size = error_message.size();
|
||||
|
||||
|
|
|
@ -4,12 +4,26 @@
|
|||
#include "db.h"
|
||||
#include "table.h"
|
||||
|
||||
/// An ErrorIndex may be used as an index into error_types.
|
||||
enum class ErrorIndex : int32_t {
|
||||
/// An object was added with a different hash from the existing one.
|
||||
OBJECT_HASH_MISMATCH = 0,
|
||||
/// An object that was created through a ray.put is lost.
|
||||
PUT_RECONSTRUCTION,
|
||||
/// A worker died or was killed while executing a task.
|
||||
WORKER_DIED,
|
||||
/// An actor hasn't been created for a while.
|
||||
ACTOR_NOT_CREATED,
|
||||
/// The total number of error types.
|
||||
MAX
|
||||
};
|
||||
|
||||
/// Data that is needed to push an error.
|
||||
typedef struct {
|
||||
/// The ID of the driver to push the error to.
|
||||
DBClientID driver_id;
|
||||
/// An index into the error_types array indicating the type of the error.
|
||||
int error_type;
|
||||
ErrorIndex error_type;
|
||||
/// The key to use for the error message in Redis.
|
||||
UniqueID error_key;
|
||||
/// The length of the error message.
|
||||
|
@ -18,20 +32,6 @@ typedef struct {
|
|||
uint8_t error_message[0];
|
||||
} ErrorInfo;
|
||||
|
||||
/// An error_index may be used as an index into error_types.
|
||||
typedef enum {
|
||||
/// An object was added with a different hash from the existing one.
|
||||
OBJECT_HASH_MISMATCH_ERROR_INDEX = 0,
|
||||
/// An object that was created through a ray.put is lost.
|
||||
PUT_RECONSTRUCTION_ERROR_INDEX,
|
||||
/// A worker died or was killed while executing a task.
|
||||
WORKER_DIED_ERROR_INDEX,
|
||||
/// An actor hasn't been created for a while.
|
||||
ACTOR_NOT_CREATED_ERROR_INDEX,
|
||||
/// The total number of error types.
|
||||
MAX_ERROR_INDEX
|
||||
} error_index;
|
||||
|
||||
extern const char *error_types[];
|
||||
|
||||
/// Push an error to the given Python driver.
|
||||
|
@ -39,12 +39,12 @@ extern const char *error_types[];
|
|||
/// \param db_handle Database handle.
|
||||
/// \param driver_id The ID of the Python driver to push the error to.
|
||||
/// \param error_type An index specifying the type of the error. This should
|
||||
/// be a value from the error_index enum.
|
||||
/// be a value from the ErrorIndex enum.
|
||||
/// \param error_message The error message to print.
|
||||
/// \return Void.
|
||||
void push_error(DBHandle *db_handle,
|
||||
DBClientID driver_id,
|
||||
int error_type,
|
||||
ErrorIndex error_type,
|
||||
const std::string &error_message);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -536,7 +536,7 @@ Task *parse_and_construct_task_from_redis_reply(redisReply *reply) {
|
|||
flatbuffers::GetRoot<TaskExecutionDependencies>(
|
||||
message->execution_dependencies()->data());
|
||||
task = Task_alloc(
|
||||
spec, task_spec_size, message->state(),
|
||||
spec, task_spec_size, static_cast<TaskStatus>(message->state()),
|
||||
from_flatbuf(*message->local_scheduler_id()),
|
||||
from_flatbuf(*execution_dependencies->execution_dependencies()));
|
||||
} else {
|
||||
|
@ -932,7 +932,7 @@ void redis_task_table_add_task(TableCallbackData *callback_data) {
|
|||
TaskID task_id = Task_task_id(task);
|
||||
DBClientID local_scheduler_id = Task_local_scheduler(task);
|
||||
redisAsyncContext *context = get_redis_context(db, task_id);
|
||||
int state = Task_state(task);
|
||||
int state = static_cast<int>(Task_state(task));
|
||||
|
||||
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
||||
TaskSpec *spec = execution_spec->Spec();
|
||||
|
@ -998,7 +998,7 @@ void redis_task_table_update(TableCallbackData *callback_data) {
|
|||
TaskID task_id = Task_task_id(task);
|
||||
redisAsyncContext *context = get_redis_context(db, task_id);
|
||||
DBClientID local_scheduler_id = Task_local_scheduler(task);
|
||||
int state = Task_state(task);
|
||||
int state = static_cast<int>(Task_state(task));
|
||||
|
||||
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
|
@ -1108,7 +1108,7 @@ void redis_task_table_subscribe_callback(redisAsyncContext *c,
|
|||
/* Handle a task table event. Parse the payload and call the callback. */
|
||||
auto message = flatbuffers::GetRoot<TaskReply>(payload->str);
|
||||
/* Extract the scheduling state. */
|
||||
int64_t state = message->state();
|
||||
TaskStatus state = static_cast<TaskStatus>(message->state());
|
||||
/* Extract the local scheduler ID. */
|
||||
DBClientID local_scheduler_id =
|
||||
from_flatbuf(*message->local_scheduler_id());
|
||||
|
@ -1673,9 +1673,10 @@ void redis_push_error_hmset_callback(redisAsyncContext *c,
|
|||
void redis_push_error(TableCallbackData *callback_data) {
|
||||
DBHandle *db = callback_data->db_handle;
|
||||
ErrorInfo *info = (ErrorInfo *) callback_data->data->Get();
|
||||
RAY_CHECK(info->error_type < MAX_ERROR_INDEX && info->error_type >= 0);
|
||||
RAY_CHECK(info->error_type < ErrorIndex::MAX &&
|
||||
info->error_type >= ErrorIndex::OBJECT_HASH_MISMATCH);
|
||||
/// Look up the error type.
|
||||
const char *error_type = error_types[info->error_type];
|
||||
const char *error_type = error_types[static_cast<int>(info->error_type)];
|
||||
|
||||
/* Set the error information. */
|
||||
int status = redisAsyncCommand(
|
||||
|
|
|
@ -39,8 +39,8 @@ void task_table_test_and_update(
|
|||
DBHandle *db_handle,
|
||||
TaskID task_id,
|
||||
DBClientID test_local_scheduler_id,
|
||||
int test_state_bitmask,
|
||||
int update_state,
|
||||
TaskStatus test_state_bitmask,
|
||||
TaskStatus update_state,
|
||||
RetryInfo *retry,
|
||||
task_table_test_and_update_callback done_callback,
|
||||
void *user_context) {
|
||||
|
@ -60,7 +60,7 @@ void task_table_test_and_update(
|
|||
/* TODO(swang): A corresponding task_table_unsubscribe. */
|
||||
void task_table_subscribe(DBHandle *db_handle,
|
||||
DBClientID local_scheduler_id,
|
||||
int state_filter,
|
||||
TaskStatus state_filter,
|
||||
task_table_subscribe_callback subscribe_callback,
|
||||
void *subscribe_context,
|
||||
RetryInfo *retry,
|
||||
|
|
|
@ -122,8 +122,8 @@ void task_table_test_and_update(
|
|||
DBHandle *db_handle,
|
||||
TaskID task_id,
|
||||
DBClientID test_local_scheduler_id,
|
||||
int test_state_bitmask,
|
||||
int update_state,
|
||||
TaskStatus test_state_bitmask,
|
||||
TaskStatus update_state,
|
||||
RetryInfo *retry,
|
||||
task_table_test_and_update_callback done_callback,
|
||||
void *user_context);
|
||||
|
@ -133,8 +133,8 @@ typedef struct {
|
|||
/** The value to test the current local scheduler ID against. This field is
|
||||
* ignored if equal to NIL_ID. */
|
||||
DBClientID test_local_scheduler_id;
|
||||
int test_state_bitmask;
|
||||
int update_state;
|
||||
TaskStatus test_state_bitmask;
|
||||
TaskStatus update_state;
|
||||
DBClientID local_scheduler_id;
|
||||
} TaskTableTestAndUpdateData;
|
||||
|
||||
|
@ -171,7 +171,7 @@ typedef void (*task_table_subscribe_callback)(Task *task, void *user_context);
|
|||
*/
|
||||
void task_table_subscribe(DBHandle *db_handle,
|
||||
DBClientID local_scheduler_id,
|
||||
int state_filter,
|
||||
TaskStatus state_filter,
|
||||
task_table_subscribe_callback subscribe_callback,
|
||||
void *subscribe_context,
|
||||
RetryInfo *retry,
|
||||
|
@ -182,7 +182,7 @@ void task_table_subscribe(DBHandle *db_handle,
|
|||
* database. */
|
||||
typedef struct {
|
||||
DBClientID local_scheduler_id;
|
||||
int state_filter;
|
||||
TaskStatus state_filter;
|
||||
task_table_subscribe_callback subscribe_callback;
|
||||
void *subscribe_context;
|
||||
} TaskTableSubscribeData;
|
||||
|
|
|
@ -543,7 +543,7 @@ bool TaskExecutionSpec::IsStaticDependency(int64_t dependency_index) const {
|
|||
|
||||
Task *Task_alloc(const TaskSpec *spec,
|
||||
int64_t task_spec_size,
|
||||
int state,
|
||||
TaskStatus state,
|
||||
DBClientID local_scheduler_id,
|
||||
const std::vector<ObjectID> &execution_dependencies) {
|
||||
Task *result = new Task();
|
||||
|
@ -556,7 +556,7 @@ Task *Task_alloc(const TaskSpec *spec,
|
|||
}
|
||||
|
||||
Task *Task_alloc(TaskExecutionSpec &execution_spec,
|
||||
int state,
|
||||
TaskStatus state,
|
||||
DBClientID local_scheduler_id) {
|
||||
Task *result = new Task();
|
||||
result->execution_spec = std::unique_ptr<TaskExecutionSpec>(
|
||||
|
@ -575,11 +575,11 @@ int64_t Task_size(Task *task_arg) {
|
|||
return sizeof(Task) - sizeof(TaskSpec) + task_arg->execution_spec->SpecSize();
|
||||
}
|
||||
|
||||
int Task_state(Task *task) {
|
||||
TaskStatus Task_state(Task *task) {
|
||||
return task->state;
|
||||
}
|
||||
|
||||
void Task_set_state(Task *task, int state) {
|
||||
void Task_set_state(Task *task, TaskStatus state) {
|
||||
task->state = state;
|
||||
}
|
||||
|
||||
|
|
|
@ -518,26 +518,31 @@ void TaskSpec_free(TaskSpec *spec);
|
|||
|
||||
/** The scheduling_state can be used as a flag when we are listening
|
||||
* for an event, for example TASK_WAITING | TASK_SCHEDULED. */
|
||||
typedef enum {
|
||||
enum class TaskStatus : uint {
|
||||
/** The task is waiting to be scheduled. */
|
||||
TASK_STATUS_WAITING = 1,
|
||||
WAITING = 1,
|
||||
/** The task has been scheduled to a node, but has not been queued yet. */
|
||||
TASK_STATUS_SCHEDULED = 2,
|
||||
SCHEDULED = 2,
|
||||
/** The task has been queued on a node, where it will wait for its
|
||||
* dependencies to become ready and a worker to become available. */
|
||||
TASK_STATUS_QUEUED = 4,
|
||||
QUEUED = 4,
|
||||
/** The task is running on a worker. */
|
||||
TASK_STATUS_RUNNING = 8,
|
||||
RUNNING = 8,
|
||||
/** The task is done executing. */
|
||||
TASK_STATUS_DONE = 16,
|
||||
DONE = 16,
|
||||
/** The task was not able to finish. */
|
||||
TASK_STATUS_LOST = 32,
|
||||
LOST = 32,
|
||||
/** The task will be submitted for reexecution. */
|
||||
TASK_STATUS_RECONSTRUCTING = 64,
|
||||
RECONSTRUCTING = 64,
|
||||
/** An actor task is cached at a local scheduler and is waiting for the
|
||||
* corresponding actor to be created. */
|
||||
TASK_STATUS_ACTOR_CACHED = 128
|
||||
} scheduling_state;
|
||||
ACTOR_CACHED = 128
|
||||
};
|
||||
|
||||
inline TaskStatus operator|(const TaskStatus &a, const TaskStatus &b) {
|
||||
uint c = static_cast<uint>(a) | static_cast<uint>(b);
|
||||
return static_cast<TaskStatus>(c);
|
||||
}
|
||||
|
||||
/** A task is an execution of a task specification. It has a state of execution
|
||||
* (see scheduling_state) and the ID of the local scheduler it is scheduled on
|
||||
|
@ -545,7 +550,7 @@ typedef enum {
|
|||
|
||||
struct Task {
|
||||
/** The scheduling state of the task. */
|
||||
int state;
|
||||
TaskStatus state;
|
||||
/** The ID of the local scheduler involved. */
|
||||
DBClientID local_scheduler_id;
|
||||
/** The execution specification for this task. */
|
||||
|
@ -562,12 +567,12 @@ struct Task {
|
|||
*/
|
||||
Task *Task_alloc(const TaskSpec *spec,
|
||||
int64_t task_spec_size,
|
||||
int state,
|
||||
TaskStatus state,
|
||||
DBClientID local_scheduler_id,
|
||||
const std::vector<ObjectID> &execution_dependencies);
|
||||
|
||||
Task *Task_alloc(TaskExecutionSpec &execution_spec,
|
||||
int state,
|
||||
TaskStatus state,
|
||||
DBClientID local_scheduler_id);
|
||||
|
||||
/**
|
||||
|
@ -582,10 +587,10 @@ Task *Task_copy(Task *other);
|
|||
int64_t Task_size(Task *task);
|
||||
|
||||
/** The scheduling state of the task. */
|
||||
int Task_state(Task *task);
|
||||
TaskStatus Task_state(Task *task);
|
||||
|
||||
/** Update the schedule state of the task. */
|
||||
void Task_set_state(Task *task, int state);
|
||||
void Task_set_state(Task *task, TaskStatus state);
|
||||
|
||||
/** Local scheduler this task has been assigned to or is running on. */
|
||||
DBClientID Task_local_scheduler(Task *task);
|
||||
|
|
|
@ -136,7 +136,7 @@ int64_t task_table_delayed_add_task(event_loop *loop,
|
|||
|
||||
void task_table_test_callback(Task *callback_task, void *user_data) {
|
||||
task_table_test_callback_called = 1;
|
||||
RAY_CHECK(Task_state(callback_task) == TASK_STATUS_SCHEDULED);
|
||||
RAY_CHECK(Task_state(callback_task) == TaskStatus::SCHEDULED);
|
||||
RAY_CHECK(Task_size(callback_task) == Task_size(task_table_test_task));
|
||||
RAY_CHECK(Task_equals(callback_task, task_table_test_task));
|
||||
event_loop *loop = (event_loop *) user_data;
|
||||
|
@ -152,13 +152,13 @@ TEST task_table_test(void) {
|
|||
DBClientID local_scheduler_id = DBClientID::from_random();
|
||||
TaskExecutionSpec spec = example_task_execution_spec(1, 1);
|
||||
task_table_test_task =
|
||||
Task_alloc(spec, TASK_STATUS_SCHEDULED, local_scheduler_id);
|
||||
Task_alloc(spec, TaskStatus::SCHEDULED, local_scheduler_id);
|
||||
RetryInfo retry = {
|
||||
.num_retries = NUM_RETRIES,
|
||||
.timeout = TIMEOUT,
|
||||
.fail_callback = task_table_test_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, local_scheduler_id, TASK_STATUS_SCHEDULED,
|
||||
task_table_subscribe(db, local_scheduler_id, TaskStatus::SCHEDULED,
|
||||
task_table_test_callback, (void *) loop, &retry, NULL,
|
||||
(void *) loop);
|
||||
event_loop_add_timer(
|
||||
|
@ -186,13 +186,13 @@ TEST task_table_all_test(void) {
|
|||
TaskExecutionSpec spec = example_task_execution_spec(1, 1);
|
||||
/* Schedule two tasks on different local local schedulers. */
|
||||
Task *task1 =
|
||||
Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random());
|
||||
Task_alloc(spec, TaskStatus::SCHEDULED, DBClientID::from_random());
|
||||
Task *task2 =
|
||||
Task_alloc(spec, TASK_STATUS_SCHEDULED, DBClientID::from_random());
|
||||
Task_alloc(spec, TaskStatus::SCHEDULED, DBClientID::from_random());
|
||||
RetryInfo retry = {
|
||||
.num_retries = NUM_RETRIES, .timeout = TIMEOUT, .fail_callback = NULL,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_SCHEDULED,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::SCHEDULED,
|
||||
task_table_all_test_callback, NULL, &retry, NULL, NULL);
|
||||
event_loop_add_timer(loop, 50, (event_loop_timer_handler) timeout_handler,
|
||||
NULL);
|
||||
|
@ -211,7 +211,7 @@ TEST task_table_all_test(void) {
|
|||
}
|
||||
|
||||
TEST unique_client_id_test(void) {
|
||||
enum { num_conns = 100 };
|
||||
const int num_conns = 100;
|
||||
|
||||
DBClientID ids[num_conns];
|
||||
DBHandle *db;
|
||||
|
|
|
@ -42,7 +42,7 @@ static inline TaskExecutionSpec example_task_execution_spec(
|
|||
|
||||
static inline Task *example_task_with_args(int64_t num_args,
|
||||
int64_t num_returns,
|
||||
int task_state,
|
||||
TaskStatus task_state,
|
||||
ObjectID arg_ids[]) {
|
||||
TaskExecutionSpec spec =
|
||||
example_task_execution_spec_with_args(num_args, num_returns, arg_ids);
|
||||
|
@ -52,7 +52,7 @@ static inline Task *example_task_with_args(int64_t num_args,
|
|||
|
||||
static inline Task *example_task(int64_t num_args,
|
||||
int64_t num_returns,
|
||||
int task_state) {
|
||||
TaskStatus task_state) {
|
||||
TaskExecutionSpec spec = example_task_execution_spec(num_args, num_returns);
|
||||
Task *instance = Task_alloc(spec, task_state, UniqueID::nil());
|
||||
return instance;
|
||||
|
|
|
@ -25,8 +25,9 @@ TEST ipc_socket_test(void) {
|
|||
socket_fd = connect_ipc_sock(socket_pathname);
|
||||
ASSERT(socket_fd >= 0);
|
||||
write_log_message(socket_fd, test_string);
|
||||
write_message(socket_fd, LOG_MESSAGE, strlen(test_bytes),
|
||||
(uint8_t *) test_bytes);
|
||||
write_message(socket_fd,
|
||||
static_cast<int64_t>(CommonMessageType::LOG_MESSAGE),
|
||||
strlen(test_bytes), (uint8_t *) test_bytes);
|
||||
close(socket_fd);
|
||||
exit(0);
|
||||
} else {
|
||||
|
@ -40,7 +41,8 @@ TEST ipc_socket_test(void) {
|
|||
int64_t len;
|
||||
uint8_t *bytes;
|
||||
read_message(client_fd, &type, &len, &bytes);
|
||||
ASSERT(type == LOG_MESSAGE);
|
||||
ASSERT(static_cast<CommonMessageType>(type) ==
|
||||
CommonMessageType::LOG_MESSAGE);
|
||||
ASSERT(memcmp(test_bytes, bytes, len) == 0);
|
||||
free(bytes);
|
||||
close(client_fd);
|
||||
|
@ -69,8 +71,9 @@ TEST long_ipc_socket_test(void) {
|
|||
socket_fd = connect_ipc_sock(socket_pathname);
|
||||
ASSERT(socket_fd >= 0);
|
||||
write_log_message(socket_fd, test_string.c_str());
|
||||
write_message(socket_fd, LOG_MESSAGE, strlen(test_bytes),
|
||||
(uint8_t *) test_bytes);
|
||||
write_message(socket_fd,
|
||||
static_cast<int64_t>(CommonMessageType::LOG_MESSAGE),
|
||||
strlen(test_bytes), (uint8_t *) test_bytes);
|
||||
close(socket_fd);
|
||||
exit(0);
|
||||
} else {
|
||||
|
@ -84,7 +87,8 @@ TEST long_ipc_socket_test(void) {
|
|||
int64_t len;
|
||||
uint8_t *bytes;
|
||||
read_message(client_fd, &type, &len, &bytes);
|
||||
ASSERT(type == LOG_MESSAGE);
|
||||
ASSERT(static_cast<CommonMessageType>(type) ==
|
||||
CommonMessageType::LOG_MESSAGE);
|
||||
ASSERT(memcmp(test_bytes, bytes, len) == 0);
|
||||
free(bytes);
|
||||
close(client_fd);
|
||||
|
|
|
@ -79,7 +79,7 @@ TEST new_object_test(void) {
|
|||
new_object_failed = 0;
|
||||
new_object_succeeded = 0;
|
||||
new_object_id = ObjectID::from_random();
|
||||
new_object_task = example_task(1, 1, TASK_STATUS_WAITING);
|
||||
new_object_task = example_task(1, 1, TaskStatus::WAITING);
|
||||
new_object_task_spec = Task_task_execution_spec(new_object_task)->Spec();
|
||||
new_object_task_id = TaskSpec_task_id(new_object_task_spec);
|
||||
g_loop = event_loop_create();
|
||||
|
@ -91,7 +91,7 @@ TEST new_object_test(void) {
|
|||
.timeout = 100,
|
||||
.fail_callback = new_object_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, task_table_subscribe_done, db);
|
||||
event_loop_run(g_loop);
|
||||
db_disconnect(db);
|
||||
|
|
|
@ -105,7 +105,7 @@ void subscribe_success_callback(TaskID task_id, void *context) {
|
|||
}
|
||||
|
||||
TEST add_lookup_test(void) {
|
||||
add_lookup_task = example_task(1, 1, TASK_STATUS_WAITING);
|
||||
add_lookup_task = example_task(1, 1, TaskStatus::WAITING);
|
||||
g_loop = event_loop_create();
|
||||
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
|
||||
"127.0.0.1", std::vector<std::string>());
|
||||
|
@ -116,7 +116,7 @@ TEST add_lookup_test(void) {
|
|||
.fail_callback = add_lookup_fail_callback,
|
||||
};
|
||||
/* Wait for subscription to succeed before adding the task. */
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, subscribe_success_callback, (void *) db);
|
||||
/* Disconnect the database to see if the lookup times out. */
|
||||
event_loop_run(g_loop);
|
||||
|
@ -156,7 +156,7 @@ TEST subscribe_timeout_test(void) {
|
|||
.timeout = 100,
|
||||
.fail_callback = subscribe_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, subscribe_done_callback,
|
||||
(void *) subscribe_timeout_context);
|
||||
/* Disconnect the database to see if the subscribe times out. */
|
||||
|
@ -194,11 +194,11 @@ TEST publish_timeout_test(void) {
|
|||
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
|
||||
"127.0.0.1", std::vector<std::string>());
|
||||
db_attach(db, g_loop, false);
|
||||
Task *task = example_task(1, 1, TASK_STATUS_WAITING);
|
||||
Task *task = example_task(1, 1, TaskStatus::WAITING);
|
||||
RetryInfo retry = {
|
||||
.num_retries = 5, .timeout = 100, .fail_callback = publish_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, NULL, NULL);
|
||||
task_table_add_task(db, task, &retry, publish_done_callback,
|
||||
(void *) publish_timeout_context);
|
||||
|
@ -270,7 +270,7 @@ TEST subscribe_retry_test(void) {
|
|||
.timeout = 100,
|
||||
.fail_callback = subscribe_retry_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, subscribe_retry_done_callback,
|
||||
(void *) subscribe_retry_context);
|
||||
/* Disconnect the database to see if the subscribe times out. */
|
||||
|
@ -315,13 +315,13 @@ TEST publish_retry_test(void) {
|
|||
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
|
||||
"127.0.0.1", std::vector<std::string>());
|
||||
db_attach(db, g_loop, false);
|
||||
Task *task = example_task(1, 1, TASK_STATUS_WAITING);
|
||||
Task *task = example_task(1, 1, TaskStatus::WAITING);
|
||||
RetryInfo retry = {
|
||||
.num_retries = 5,
|
||||
.timeout = 100,
|
||||
.fail_callback = publish_retry_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, NULL, NULL);
|
||||
task_table_add_task(db, task, &retry, publish_retry_done_callback,
|
||||
(void *) publish_retry_context);
|
||||
|
@ -374,7 +374,7 @@ TEST subscribe_late_test(void) {
|
|||
.timeout = 0,
|
||||
.fail_callback = subscribe_late_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
&retry, subscribe_late_done_callback,
|
||||
(void *) subscribe_late_context);
|
||||
/* Install handler for terminating the event loop. */
|
||||
|
@ -414,13 +414,13 @@ TEST publish_late_test(void) {
|
|||
DBHandle *db = db_connect(std::string("127.0.0.1"), 6379, "plasma_manager",
|
||||
"127.0.0.1", std::vector<std::string>());
|
||||
db_attach(db, g_loop, false);
|
||||
Task *task = example_task(1, 1, TASK_STATUS_WAITING);
|
||||
Task *task = example_task(1, 1, TaskStatus::WAITING);
|
||||
RetryInfo retry = {
|
||||
.num_retries = 0,
|
||||
.timeout = 0,
|
||||
.fail_callback = publish_late_fail_callback,
|
||||
};
|
||||
task_table_subscribe(db, UniqueID::nil(), TASK_STATUS_WAITING, NULL, NULL,
|
||||
task_table_subscribe(db, UniqueID::nil(), TaskStatus::WAITING, NULL, NULL,
|
||||
NULL, NULL, NULL);
|
||||
task_table_add_task(db, task, &retry, publish_late_done_callback,
|
||||
(void *) publish_late_context);
|
||||
|
|
|
@ -181,13 +181,15 @@ TEST send_task(void) {
|
|||
TaskSpec *spec = TaskSpec_finish_construct(builder, &size);
|
||||
int fd[2];
|
||||
socketpair(AF_UNIX, SOCK_STREAM, 0, fd);
|
||||
write_message(fd[0], SUBMIT_TASK, size, (uint8_t *) spec);
|
||||
write_message(fd[0], static_cast<int64_t>(CommonMessageType::SUBMIT_TASK),
|
||||
size, (uint8_t *) spec);
|
||||
int64_t type;
|
||||
int64_t length;
|
||||
uint8_t *message;
|
||||
read_message(fd[1], &type, &length, &message);
|
||||
TaskSpec *result = (TaskSpec *) message;
|
||||
ASSERT(type == SUBMIT_TASK);
|
||||
ASSERT(static_cast<CommonMessageType>(type) ==
|
||||
CommonMessageType::SUBMIT_TASK);
|
||||
ASSERT(memcmp(spec, result, size) == 0);
|
||||
TaskSpec_free(spec);
|
||||
free(result);
|
||||
|
|
|
@ -31,7 +31,7 @@ void assign_task_to_local_scheduler_retry(UniqueID id,
|
|||
void *user_data) {
|
||||
GlobalSchedulerState *state = (GlobalSchedulerState *) user_context;
|
||||
Task *task = (Task *) user_data;
|
||||
RAY_CHECK(Task_state(task) == TASK_STATUS_SCHEDULED);
|
||||
RAY_CHECK(Task_state(task) == TaskStatus::SCHEDULED);
|
||||
|
||||
// If the local scheduler has died since we requested the task assignment, do
|
||||
// not retry again.
|
||||
|
@ -71,7 +71,7 @@ void assign_task_to_local_scheduler(GlobalSchedulerState *state,
|
|||
TaskSpec *spec = Task_task_execution_spec(task)->Spec();
|
||||
RAY_LOG(DEBUG) << "assigning task to local_scheduler_id = "
|
||||
<< local_scheduler_id;
|
||||
Task_set_state(task, TASK_STATUS_SCHEDULED);
|
||||
Task_set_state(task, TaskStatus::SCHEDULED);
|
||||
Task_set_local_scheduler(task, local_scheduler_id);
|
||||
RAY_LOG(DEBUG) << "Issuing a task table update for task = "
|
||||
<< Task_task_id(task);
|
||||
|
@ -438,7 +438,7 @@ void start_server(const char *node_ip_address,
|
|||
* submits tasks to the global scheduler before the global scheduler
|
||||
* successfully subscribes, then the local scheduler that submitted the tasks
|
||||
* will retry. */
|
||||
task_table_subscribe(g_state->db, UniqueID::nil(), TASK_STATUS_WAITING,
|
||||
task_table_subscribe(g_state->db, UniqueID::nil(), TaskStatus::WAITING,
|
||||
process_task_waiting, (void *) g_state, NULL, NULL,
|
||||
NULL);
|
||||
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
*
|
||||
*/
|
||||
|
||||
typedef enum {
|
||||
enum class GlobalSchedulerAlgorithm {
|
||||
SCHED_ALGORITHM_ROUND_ROBIN = 1,
|
||||
SCHED_ALGORITHM_TRANSFER_AWARE = 2,
|
||||
SCHED_ALGORITHM_MAX
|
||||
} global_scheduler_algorithm;
|
||||
};
|
||||
|
||||
/// The class encapsulating state managed by the global scheduling policy.
|
||||
class GlobalSchedulerPolicyState {
|
||||
|
|
|
@ -50,7 +50,7 @@ set(LOCAL_SCHEDULER_FBS_OUTPUT_FILES
|
|||
|
||||
add_custom_command(
|
||||
OUTPUT ${LOCAL_SCHEDULER_FBS_OUTPUT_FILES}
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${LOCAL_SCHEDULER_FBS_SRC} --gen-object-api
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${LOCAL_SCHEDULER_FBS_SRC} --gen-object-api --scoped-enums
|
||||
DEPENDS ${FBS_DEPENDS}
|
||||
COMMENT "Running flatc compiler on ${LOCAL_SCHEDULER_FBS_SRC}"
|
||||
VERBATIM)
|
||||
|
|
|
@ -28,6 +28,8 @@
|
|||
#include "state/object_table.h"
|
||||
#include "state/error_table.h"
|
||||
|
||||
using MessageType = ray::local_scheduler::protocol::MessageType;
|
||||
|
||||
/**
|
||||
* A helper function for printing available and requested resource information.
|
||||
*
|
||||
|
@ -133,7 +135,7 @@ void kill_worker(LocalSchedulerState *state,
|
|||
error_message << "The worker with ID " << worker->client_id << " died or "
|
||||
<< "was killed while executing the task with ID "
|
||||
<< TaskSpec_task_id(spec);
|
||||
push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX,
|
||||
push_error(state->db, TaskSpec_driver_id(spec), ErrorIndex::WORKER_DIED,
|
||||
error_message.str());
|
||||
}
|
||||
|
||||
|
@ -141,7 +143,7 @@ void kill_worker(LocalSchedulerState *state,
|
|||
if (worker->task_in_progress) {
|
||||
/* Update the task table to reflect that the task failed to complete. */
|
||||
if (state->db != NULL) {
|
||||
Task_set_state(worker->task_in_progress, TASK_STATUS_LOST);
|
||||
Task_set_state(worker->task_in_progress, TaskStatus::LOST);
|
||||
#if !RAY_USE_NEW_GCS
|
||||
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
|
||||
#else
|
||||
|
@ -534,7 +536,7 @@ void assign_task_to_worker(LocalSchedulerState *state,
|
|||
fbb.Finish(message);
|
||||
|
||||
if (write_message(worker->sock,
|
||||
ray::local_scheduler::protocol::MessageType_ExecuteTask,
|
||||
static_cast<int64_t>(MessageType::ExecuteTask),
|
||||
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
/* Something went wrong, so kill the worker. */
|
||||
|
@ -547,7 +549,7 @@ void assign_task_to_worker(LocalSchedulerState *state,
|
|||
}
|
||||
|
||||
Task *task =
|
||||
Task_alloc(execution_spec, TASK_STATUS_RUNNING,
|
||||
Task_alloc(execution_spec, TaskStatus::RUNNING,
|
||||
state->db ? get_db_client_id(state->db) : DBClientID::nil());
|
||||
/* Record which task this worker is executing. This will be freed in
|
||||
* process_message when the worker sends a GetTask message to the local
|
||||
|
@ -625,7 +627,7 @@ void finish_task(LocalSchedulerState *state, LocalSchedulerClient *worker) {
|
|||
/* If we're connected to Redis, update tables. */
|
||||
if (state->db != NULL) {
|
||||
/* Update control state tables. */
|
||||
int task_state = TASK_STATUS_DONE;
|
||||
TaskStatus task_state = TaskStatus::DONE;
|
||||
Task_set_state(worker->task_in_progress, task_state);
|
||||
#if !RAY_USE_NEW_GCS
|
||||
auto retryInfo = RetryInfo{
|
||||
|
@ -692,12 +694,13 @@ void reconstruct_task_update_callback(Task *task,
|
|||
#if !RAY_USE_NEW_GCS
|
||||
task_table_test_and_update(state->db, Task_task_id(task),
|
||||
current_local_scheduler_id, Task_state(task),
|
||||
TASK_STATUS_RECONSTRUCTING, NULL,
|
||||
TaskStatus::RECONSTRUCTING, NULL,
|
||||
reconstruct_task_update_callback, state);
|
||||
#else
|
||||
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
|
||||
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
|
||||
Task_state(task), SchedulingState_RECONSTRUCTING,
|
||||
static_cast<SchedulingState>(Task_state(task)),
|
||||
SchedulingState::RECONSTRUCTING,
|
||||
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
|
||||
const TaskTableDataT &t, bool updated) {
|
||||
reconstruct_task_update_callback(task, user_context, updated);
|
||||
|
@ -750,19 +753,20 @@ void reconstruct_put_task_update_callback(Task *task,
|
|||
#if !RAY_USE_NEW_GCS
|
||||
task_table_test_and_update(state->db, Task_task_id(task),
|
||||
current_local_scheduler_id, Task_state(task),
|
||||
TASK_STATUS_RECONSTRUCTING, NULL,
|
||||
TaskStatus::RECONSTRUCTING, NULL,
|
||||
reconstruct_put_task_update_callback, state);
|
||||
#else
|
||||
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
|
||||
&state->gcs_client, Task_task_id(task), current_local_scheduler_id,
|
||||
Task_state(task), SchedulingState_RECONSTRUCTING,
|
||||
static_cast<SchedulingState>(Task_state(task)),
|
||||
SchedulingState::RECONSTRUCTING,
|
||||
[task, user_context](gcs::AsyncGcsClient *, const ray::TaskID &,
|
||||
const TaskTableDataT &, bool updated) {
|
||||
reconstruct_put_task_update_callback(task, user_context, updated);
|
||||
}));
|
||||
Task_free(task);
|
||||
#endif
|
||||
} else if (Task_state(task) == TASK_STATUS_RUNNING) {
|
||||
} else if (Task_state(task) == TaskStatus::RUNNING) {
|
||||
/* (1) The task is still executing on a live node. The object created
|
||||
* by `ray.put` was not able to be reconstructed, and the workload will
|
||||
* likely hang. Push an error to the appropriate driver. */
|
||||
|
@ -773,7 +777,7 @@ void reconstruct_put_task_update_callback(Task *task,
|
|||
<< " is still executing and so the object created by "
|
||||
<< "ray.put could not be reconstructed.";
|
||||
push_error(state->db, TaskSpec_driver_id(spec),
|
||||
PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str());
|
||||
ErrorIndex::PUT_RECONSTRUCTION, error_message.str());
|
||||
}
|
||||
} else {
|
||||
/* (1) The task is still executing and it is the driver task. We cannot
|
||||
|
@ -786,10 +790,10 @@ void reconstruct_put_task_update_callback(Task *task,
|
|||
<< " is a driver task and so the object created by ray.put "
|
||||
<< "could not be reconstructed.";
|
||||
push_error(state->db, TaskSpec_driver_id(spec),
|
||||
PUT_RECONSTRUCTION_ERROR_INDEX, error_message.str());
|
||||
ErrorIndex::PUT_RECONSTRUCTION, error_message.str());
|
||||
}
|
||||
} else {
|
||||
/* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with
|
||||
/* The update to TaskStatus::RECONSTRUCTING succeeded, so continue with
|
||||
* reconstruction as usual. */
|
||||
reconstruct_task_update_callback(task, user_context, updated);
|
||||
}
|
||||
|
@ -818,19 +822,21 @@ void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
|
|||
* claim responsibility for reconstruction. */
|
||||
#if !RAY_USE_NEW_GCS
|
||||
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
|
||||
(TASK_STATUS_DONE | TASK_STATUS_LOST),
|
||||
TASK_STATUS_RECONSTRUCTING, NULL, done_callback,
|
||||
(TaskStatus::DONE | TaskStatus::LOST),
|
||||
TaskStatus::RECONSTRUCTING, NULL, done_callback,
|
||||
state);
|
||||
#else
|
||||
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
|
||||
&state->gcs_client, task_id, DBClientID::nil(),
|
||||
SchedulingState_DONE | SchedulingState_LOST,
|
||||
SchedulingState_RECONSTRUCTING,
|
||||
static_cast<SchedulingState>(static_cast<uint>(SchedulingState::DONE) |
|
||||
static_cast<uint>(SchedulingState::LOST)),
|
||||
SchedulingState::RECONSTRUCTING,
|
||||
[done_callback, state](gcs::AsyncGcsClient *, const ray::TaskID &,
|
||||
const TaskTableDataT &t, bool updated) {
|
||||
Task *task = Task_alloc(
|
||||
t.task_info.data(), t.task_info.size(), t.scheduling_state,
|
||||
DBClientID::from_binary(t.scheduler_id), std::vector<ObjectID>());
|
||||
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
|
||||
static_cast<TaskStatus>(t.scheduling_state),
|
||||
DBClientID::from_binary(t.scheduler_id),
|
||||
std::vector<ObjectID>());
|
||||
done_callback(task, state, updated);
|
||||
Task_free(task);
|
||||
}));
|
||||
|
@ -855,17 +861,18 @@ void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
|
|||
* reconstruction. */
|
||||
#if !RAY_USE_NEW_GCS
|
||||
task_table_test_and_update(state->db, task_id, DBClientID::nil(),
|
||||
TASK_STATUS_LOST, TASK_STATUS_RECONSTRUCTING, NULL,
|
||||
TaskStatus::LOST, TaskStatus::RECONSTRUCTING, NULL,
|
||||
reconstruct_task_update_callback, state);
|
||||
#else
|
||||
RAY_CHECK_OK(gcs::TaskTableTestAndUpdate(
|
||||
&state->gcs_client, task_id, DBClientID::nil(), SchedulingState_LOST,
|
||||
SchedulingState_RECONSTRUCTING,
|
||||
&state->gcs_client, task_id, DBClientID::nil(), SchedulingState::LOST,
|
||||
SchedulingState::RECONSTRUCTING,
|
||||
[state](gcs::AsyncGcsClient *, const ray::TaskID &,
|
||||
const TaskTableDataT &t, bool updated) {
|
||||
Task *task = Task_alloc(
|
||||
t.task_info.data(), t.task_info.size(), t.scheduling_state,
|
||||
DBClientID::from_binary(t.scheduler_id), std::vector<ObjectID>());
|
||||
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
|
||||
static_cast<TaskStatus>(t.scheduling_state),
|
||||
DBClientID::from_binary(t.scheduler_id),
|
||||
std::vector<ObjectID>());
|
||||
reconstruct_task_update_callback(task, state, updated);
|
||||
Task_free(task);
|
||||
}));
|
||||
|
@ -1036,10 +1043,9 @@ void handle_get_actor_frontier(LocalSchedulerState *state,
|
|||
fbb.CreateVector(task_counter_vector), to_flatbuf(fbb, frontier_vector));
|
||||
fbb.Finish(reply);
|
||||
/* Respond with the built ActorFrontier. */
|
||||
if (write_message(
|
||||
worker->sock,
|
||||
ray::local_scheduler::protocol::MessageType_GetActorFrontierReply,
|
||||
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
|
||||
if (write_message(worker->sock,
|
||||
static_cast<int64_t>(MessageType::GetActorFrontierReply),
|
||||
fbb.GetSize(), (uint8_t *) fbb.GetBufferPointer()) < 0) {
|
||||
if (errno == EPIPE || errno == EBADF) {
|
||||
/* Something went wrong, so kill the worker. */
|
||||
kill_worker(state, worker, false, false);
|
||||
|
@ -1087,7 +1093,7 @@ void process_message(event_loop *loop,
|
|||
RAY_LOG(DEBUG) << "New event of type " << type;
|
||||
|
||||
switch (type) {
|
||||
case ray::local_scheduler::protocol::MessageType_SubmitTask: {
|
||||
case static_cast<int64_t>(MessageType::SubmitTask): {
|
||||
auto message =
|
||||
flatbuffers::GetRoot<ray::local_scheduler::protocol::SubmitTaskRequest>(
|
||||
input);
|
||||
|
@ -1117,9 +1123,9 @@ void process_message(event_loop *loop,
|
|||
execution_spec);
|
||||
}
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_TaskDone: {
|
||||
case static_cast<int64_t>(MessageType::TaskDone): {
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_DisconnectClient: {
|
||||
case static_cast<int64_t>(MessageType::DisconnectClient): {
|
||||
finish_task(state, worker);
|
||||
RAY_CHECK(!worker->disconnected);
|
||||
worker->disconnected = true;
|
||||
|
@ -1129,7 +1135,7 @@ void process_message(event_loop *loop,
|
|||
start_worker(state);
|
||||
}
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_EventLogMessage: {
|
||||
case static_cast<int64_t>(MessageType::EventLogMessage): {
|
||||
/* Parse the message. */
|
||||
auto message =
|
||||
flatbuffers::GetRoot<ray::local_scheduler::protocol::EventLogMessage>(
|
||||
|
@ -1141,12 +1147,12 @@ void process_message(event_loop *loop,
|
|||
message->value()->size(), message->timestamp());
|
||||
}
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_RegisterClientRequest: {
|
||||
case static_cast<int64_t>(MessageType::RegisterClientRequest): {
|
||||
auto message = flatbuffers::GetRoot<
|
||||
ray::local_scheduler::protocol::RegisterClientRequest>(input);
|
||||
handle_client_register(state, worker, message);
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_GetTask: {
|
||||
case static_cast<int64_t>(MessageType::GetTask): {
|
||||
/* If this worker reports a completed task, account for resources. */
|
||||
finish_task(state, worker);
|
||||
/* Let the scheduling algorithm process the fact that there is an available
|
||||
|
@ -1157,7 +1163,7 @@ void process_message(event_loop *loop,
|
|||
handle_actor_worker_available(state, state->algorithm_state, worker);
|
||||
}
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_ReconstructObject: {
|
||||
case static_cast<int64_t>(MessageType::ReconstructObject): {
|
||||
auto message =
|
||||
flatbuffers::GetRoot<ray::local_scheduler::protocol::ReconstructObject>(
|
||||
input);
|
||||
|
@ -1186,11 +1192,11 @@ void process_message(event_loop *loop,
|
|||
}
|
||||
reconstruct_object(state, from_flatbuf(*message->object_id()));
|
||||
} break;
|
||||
case DISCONNECT_CLIENT: {
|
||||
case static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT): {
|
||||
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
|
||||
handle_client_disconnect(state, worker);
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_NotifyUnblocked: {
|
||||
case static_cast<int64_t>(MessageType::NotifyUnblocked): {
|
||||
/* TODO(rkn): A driver may call this as well, right? */
|
||||
if (worker->task_in_progress != NULL) {
|
||||
/* If the worker was executing a task (i.e. non-driver), update its
|
||||
|
@ -1218,19 +1224,19 @@ void process_message(event_loop *loop,
|
|||
}
|
||||
print_worker_info("Worker unblocked", state->algorithm_state);
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_PutObject: {
|
||||
case static_cast<int64_t>(MessageType::PutObject): {
|
||||
auto message =
|
||||
flatbuffers::GetRoot<ray::local_scheduler::protocol::PutObject>(input);
|
||||
result_table_add(state->db, from_flatbuf(*message->object_id()),
|
||||
from_flatbuf(*message->task_id()), true, NULL, NULL, NULL);
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest: {
|
||||
case static_cast<int64_t>(MessageType::GetActorFrontierRequest): {
|
||||
auto message = flatbuffers::GetRoot<
|
||||
ray::local_scheduler::protocol::GetActorFrontierRequest>(input);
|
||||
ActorID actor_id = from_flatbuf(*message->actor_id());
|
||||
handle_get_actor_frontier(state, worker, actor_id);
|
||||
} break;
|
||||
case ray::local_scheduler::protocol::MessageType_SetActorFrontier: {
|
||||
case static_cast<int64_t>(MessageType::SetActorFrontier): {
|
||||
auto message =
|
||||
flatbuffers::GetRoot<ray::local_scheduler::protocol::ActorFrontier>(
|
||||
input);
|
||||
|
@ -1439,7 +1445,7 @@ void start_server(
|
|||
* scheduler before the call to subscribe. */
|
||||
if (g_state->db != NULL) {
|
||||
task_table_subscribe(g_state->db, get_db_client_id(g_state->db),
|
||||
TASK_STATUS_SCHEDULED, handle_task_scheduled_callback,
|
||||
TaskStatus::SCHEDULED, handle_task_scheduled_callback,
|
||||
g_state, NULL, NULL, NULL);
|
||||
}
|
||||
/* Subscribe to notifications about newly created actors. */
|
||||
|
|
|
@ -387,7 +387,7 @@ void finish_killed_task(LocalSchedulerState *state,
|
|||
}
|
||||
/* Mark the task as done. */
|
||||
if (state->db != NULL) {
|
||||
Task *task = Task_alloc(execution_spec, TASK_STATUS_DONE,
|
||||
Task *task = Task_alloc(execution_spec, TaskStatus::DONE,
|
||||
get_db_client_id(state->db));
|
||||
#if !RAY_USE_NEW_GCS
|
||||
// In most cases, task_table_update would be appropriate, however, it is
|
||||
|
@ -502,7 +502,7 @@ void queue_actor_task(LocalSchedulerState *state,
|
|||
|
||||
/* Update the task table. */
|
||||
if (state->db != NULL) {
|
||||
Task *task = Task_alloc(execution_spec, TASK_STATUS_QUEUED,
|
||||
Task *task = Task_alloc(execution_spec, TaskStatus::QUEUED,
|
||||
get_db_client_id(state->db));
|
||||
if (from_global_scheduler) {
|
||||
/* If the task is from the global scheduler, it's already been added to
|
||||
|
@ -887,7 +887,7 @@ void spillback_tasks_handler(LocalSchedulerState *state) {
|
|||
<< " ";
|
||||
}
|
||||
push_error(state->db, TaskSpec_driver_id(spec),
|
||||
ACTOR_NOT_CREATED_ERROR_INDEX, error_message.str());
|
||||
ErrorIndex::ACTOR_NOT_CREATED, error_message.str());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1001,7 +1001,7 @@ std::list<TaskExecutionSpec>::iterator queue_task(
|
|||
* task table to notify others that we have queued it. */
|
||||
if (state->db != NULL) {
|
||||
Task *task =
|
||||
Task_alloc(task_entry, TASK_STATUS_QUEUED, get_db_client_id(state->db));
|
||||
Task_alloc(task_entry, TaskStatus::QUEUED, get_db_client_id(state->db));
|
||||
#if !RAY_USE_NEW_GCS
|
||||
if (from_global_scheduler) {
|
||||
/* If the task is from the global scheduler, it's already been added to
|
||||
|
@ -1132,7 +1132,7 @@ void give_task_to_local_scheduler_retry(UniqueID id,
|
|||
void *user_data) {
|
||||
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
||||
Task *task = (Task *) user_data;
|
||||
RAY_CHECK(Task_state(task) == TASK_STATUS_SCHEDULED);
|
||||
RAY_CHECK(Task_state(task) == TaskStatus::SCHEDULED);
|
||||
|
||||
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
||||
TaskSpec *spec = execution_spec->Spec();
|
||||
|
@ -1203,7 +1203,7 @@ void give_task_to_local_scheduler(LocalSchedulerState *state,
|
|||
/* Assign the task to the relevant local scheduler. */
|
||||
RAY_CHECK(state->config.global_scheduler_exists);
|
||||
Task *task =
|
||||
Task_alloc(execution_spec, TASK_STATUS_SCHEDULED, local_scheduler_id);
|
||||
Task_alloc(execution_spec, TaskStatus::SCHEDULED, local_scheduler_id);
|
||||
#if !RAY_USE_NEW_GCS
|
||||
auto retryInfo = RetryInfo{
|
||||
.num_retries = 0, // This value is unused.
|
||||
|
@ -1223,7 +1223,7 @@ void give_task_to_global_scheduler_retry(UniqueID id,
|
|||
void *user_data) {
|
||||
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
||||
Task *task = (Task *) user_data;
|
||||
RAY_CHECK(Task_state(task) == TASK_STATUS_WAITING);
|
||||
RAY_CHECK(Task_state(task) == TaskStatus::WAITING);
|
||||
|
||||
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
||||
TaskSpec *spec = execution_spec->Spec();
|
||||
|
@ -1250,7 +1250,7 @@ void give_task_to_global_scheduler(LocalSchedulerState *state,
|
|||
}
|
||||
/* Pass on the task to the global scheduler. */
|
||||
RAY_CHECK(state->config.global_scheduler_exists);
|
||||
Task *task = Task_alloc(execution_spec, TASK_STATUS_WAITING,
|
||||
Task *task = Task_alloc(execution_spec, TaskStatus::WAITING,
|
||||
get_db_client_id(state->db));
|
||||
#if !RAY_USE_NEW_GCS
|
||||
RAY_CHECK(state->db != NULL);
|
||||
|
@ -1326,7 +1326,7 @@ void handle_actor_task_submitted(LocalSchedulerState *state,
|
|||
if (state->actor_mapping.count(actor_id) == 0) {
|
||||
// Create a copy of the task to write to the task table.
|
||||
Task *task = Task_alloc(
|
||||
task_spec, execution_spec.SpecSize(), TASK_STATUS_ACTOR_CACHED,
|
||||
task_spec, execution_spec.SpecSize(), TaskStatus::ACTOR_CACHED,
|
||||
get_db_client_id(state->db), execution_spec.ExecutionDependencies());
|
||||
|
||||
/* Add this task to a queue of tasks that have been submitted but the local
|
||||
|
|
|
@ -10,6 +10,8 @@
|
|||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
|
||||
using MessageType = ray::local_scheduler::protocol::MessageType;
|
||||
|
||||
LocalSchedulerConnection *LocalSchedulerConnection_init(
|
||||
const char *local_scheduler_socket,
|
||||
UniqueID client_id,
|
||||
|
@ -26,8 +28,7 @@ LocalSchedulerConnection *LocalSchedulerConnection_init(
|
|||
fbb.Finish(message);
|
||||
/* Register the process ID with the local scheduler. */
|
||||
int success = write_message(
|
||||
result->conn,
|
||||
ray::local_scheduler::protocol::MessageType_RegisterClientRequest,
|
||||
result->conn, static_cast<int64_t>(MessageType::RegisterClientRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
RAY_CHECK(success == 0) << "Unable to register worker with local scheduler";
|
||||
|
||||
|
@ -43,8 +44,7 @@ void local_scheduler_disconnect_client(LocalSchedulerConnection *conn) {
|
|||
flatbuffers::FlatBufferBuilder fbb;
|
||||
auto message = ray::local_scheduler::protocol::CreateDisconnectClient(fbb);
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_DisconnectClient,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::DisconnectClient),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -60,8 +60,7 @@ void local_scheduler_log_event(LocalSchedulerConnection *conn,
|
|||
auto message = ray::local_scheduler::protocol::CreateEventLogMessage(
|
||||
fbb, key_string, value_string, timestamp);
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_EventLogMessage,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::EventLogMessage),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -76,8 +75,7 @@ void local_scheduler_submit(LocalSchedulerConnection *conn,
|
|||
auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest(
|
||||
fbb, execution_dependencies, task_spec);
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_SubmitTask,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::SubmitTask),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -90,26 +88,25 @@ void local_scheduler_submit_raylet(
|
|||
auto message = ray::local_scheduler::protocol::CreateSubmitTaskRequest(
|
||||
fbb, execution_dependencies_message, task_spec.ToFlatbuffer(fbb));
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_SubmitTask,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::SubmitTask),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
|
||||
int64_t *task_size) {
|
||||
write_message(conn->conn, ray::local_scheduler::protocol::MessageType_GetTask,
|
||||
0, NULL);
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::GetTask), 0,
|
||||
NULL);
|
||||
int64_t type;
|
||||
int64_t reply_size;
|
||||
uint8_t *reply;
|
||||
/* Receive a task from the local scheduler. This will block until the local
|
||||
* scheduler gives this client a task. */
|
||||
read_message(conn->conn, &type, &reply_size, &reply);
|
||||
if (type == DISCONNECT_CLIENT) {
|
||||
if (type == static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT)) {
|
||||
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
|
||||
exit(1);
|
||||
}
|
||||
RAY_CHECK(type == ray::local_scheduler::protocol::MessageType_ExecuteTask);
|
||||
RAY_CHECK(static_cast<MessageType>(type) == MessageType::ExecuteTask);
|
||||
|
||||
/* Parse the flatbuffer object. */
|
||||
auto reply_message =
|
||||
|
@ -137,8 +134,8 @@ TaskSpec *local_scheduler_get_task(LocalSchedulerConnection *conn,
|
|||
}
|
||||
|
||||
void local_scheduler_task_done(LocalSchedulerConnection *conn) {
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_TaskDone, 0, NULL);
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::TaskDone), 0,
|
||||
NULL);
|
||||
}
|
||||
|
||||
void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
|
||||
|
@ -148,21 +145,19 @@ void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
|
|||
fbb, to_flatbuf(fbb, object_id));
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_ReconstructObject,
|
||||
static_cast<int64_t>(MessageType::ReconstructObject),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
/* TODO(swang): Propagate the error. */
|
||||
}
|
||||
|
||||
void local_scheduler_log_message(LocalSchedulerConnection *conn) {
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_EventLogMessage, 0,
|
||||
NULL);
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::EventLogMessage),
|
||||
0, NULL);
|
||||
}
|
||||
|
||||
void local_scheduler_notify_unblocked(LocalSchedulerConnection *conn) {
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_NotifyUnblocked, 0,
|
||||
NULL);
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::NotifyUnblocked),
|
||||
0, NULL);
|
||||
}
|
||||
|
||||
void local_scheduler_put_object(LocalSchedulerConnection *conn,
|
||||
|
@ -173,8 +168,7 @@ void local_scheduler_put_object(LocalSchedulerConnection *conn,
|
|||
fbb, to_flatbuf(fbb, task_id), to_flatbuf(fbb, object_id));
|
||||
fbb.Finish(message);
|
||||
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_PutObject,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::PutObject),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
}
|
||||
|
||||
|
@ -185,27 +179,26 @@ const std::vector<uint8_t> local_scheduler_get_actor_frontier(
|
|||
auto message = ray::local_scheduler::protocol::CreateGetActorFrontierRequest(
|
||||
fbb, to_flatbuf(fbb, actor_id));
|
||||
fbb.Finish(message);
|
||||
write_message(
|
||||
conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_GetActorFrontierRequest,
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
write_message(conn->conn,
|
||||
static_cast<int64_t>(MessageType::GetActorFrontierRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
|
||||
int64_t type;
|
||||
std::vector<uint8_t> reply;
|
||||
read_vector(conn->conn, &type, reply);
|
||||
if (type == DISCONNECT_CLIENT) {
|
||||
if (static_cast<CommonMessageType>(type) ==
|
||||
CommonMessageType::DISCONNECT_CLIENT) {
|
||||
RAY_LOG(DEBUG) << "Exiting because local scheduler closed connection.";
|
||||
exit(1);
|
||||
}
|
||||
RAY_CHECK(type ==
|
||||
ray::local_scheduler::protocol::MessageType_GetActorFrontierReply);
|
||||
RAY_CHECK(static_cast<MessageType>(type) ==
|
||||
MessageType::GetActorFrontierReply);
|
||||
return reply;
|
||||
}
|
||||
|
||||
void local_scheduler_set_actor_frontier(LocalSchedulerConnection *conn,
|
||||
const std::vector<uint8_t> &frontier) {
|
||||
write_message(conn->conn,
|
||||
ray::local_scheduler::protocol::MessageType_SetActorFrontier,
|
||||
write_message(conn->conn, static_cast<int64_t>(MessageType::SetActorFrontier),
|
||||
frontier.size(), const_cast<uint8_t *>(frontier.data()));
|
||||
}
|
||||
|
||||
|
@ -221,14 +214,16 @@ std::pair<std::vector<ObjectID>, std::vector<ObjectID>> local_scheduler_wait(
|
|||
fbb, to_flatbuf(fbb, object_ids), num_returns, timeout_milliseconds,
|
||||
wait_local);
|
||||
fbb.Finish(message);
|
||||
write_message(conn->conn, ray::protocol::MessageType_WaitRequest,
|
||||
write_message(conn->conn,
|
||||
static_cast<int64_t>(ray::protocol::MessageType::WaitRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
// Read result.
|
||||
int64_t type;
|
||||
int64_t reply_size;
|
||||
uint8_t *reply;
|
||||
read_message(conn->conn, &type, &reply_size, &reply);
|
||||
RAY_CHECK(type == ray::protocol::MessageType_WaitReply);
|
||||
RAY_CHECK(static_cast<ray::protocol::MessageType>(type) ==
|
||||
ray::protocol::MessageType::WaitReply);
|
||||
auto reply_message = flatbuffers::GetRoot<ray::protocol::WaitReply>(reply);
|
||||
// Convert result.
|
||||
std::pair<std::vector<ObjectID>, std::vector<ObjectID>> result;
|
||||
|
|
|
@ -228,10 +228,10 @@ TEST object_reconstruction_test(void) {
|
|||
event_loop_add_timer(local_scheduler->loop, 500,
|
||||
(event_loop_timer_handler) timeout_handler, NULL);
|
||||
event_loop_run(local_scheduler->loop);
|
||||
/* Set the task's status to TASK_STATUS_DONE to prevent the race condition
|
||||
/* Set the task's status to TaskStatus::DONE to prevent the race condition
|
||||
* that would suppress object reconstruction. */
|
||||
Task *task = Task_alloc(
|
||||
execution_spec, TASK_STATUS_DONE,
|
||||
execution_spec, TaskStatus::DONE,
|
||||
get_db_client_id(local_scheduler->local_scheduler_state->db));
|
||||
#if !RAY_USE_NEW_GCS
|
||||
task_table_add_task(local_scheduler->local_scheduler_state->db, task, NULL,
|
||||
|
@ -349,10 +349,10 @@ TEST object_reconstruction_recursive_test(void) {
|
|||
event_loop_add_timer(local_scheduler->loop, 500,
|
||||
(event_loop_timer_handler) timeout_handler, NULL);
|
||||
event_loop_run(local_scheduler->loop);
|
||||
/* Set the final task's status to TASK_STATUS_DONE to prevent the race
|
||||
/* Set the final task's status to TaskStatus::DONE to prevent the race
|
||||
* condition that would suppress object reconstruction. */
|
||||
Task *last_task = Task_alloc(
|
||||
specs[NUM_TASKS - 1], TASK_STATUS_DONE,
|
||||
specs[NUM_TASKS - 1], TaskStatus::DONE,
|
||||
get_db_client_id(local_scheduler->local_scheduler_state->db));
|
||||
#if !RAY_USE_NEW_GCS
|
||||
task_table_add_task(local_scheduler->local_scheduler_state->db, last_task,
|
||||
|
|
|
@ -1309,7 +1309,7 @@ void log_object_hash_mismatch_error_task_callback(Task *task,
|
|||
<< "hash. This may mean that a non-deterministic task was "
|
||||
<< "reexecuted.";
|
||||
push_error(state->db, TaskSpec_driver_id(spec),
|
||||
OBJECT_HASH_MISMATCH_ERROR_INDEX, error_message.str());
|
||||
ErrorIndex::OBJECT_HASH_MISMATCH, error_message.str());
|
||||
}
|
||||
|
||||
void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
|
||||
|
@ -1327,9 +1327,10 @@ void log_object_hash_mismatch_error_result_callback(ObjectID object_id,
|
|||
ray::JobID::nil(), task_id,
|
||||
[user_context](gcs::AsyncGcsClient *, const TaskID &,
|
||||
const TaskTableDataT &t) {
|
||||
Task *task = Task_alloc(
|
||||
t.task_info.data(), t.task_info.size(), t.scheduling_state,
|
||||
DBClientID::from_binary(t.scheduler_id), std::vector<ObjectID>());
|
||||
Task *task = Task_alloc(t.task_info.data(), t.task_info.size(),
|
||||
static_cast<TaskStatus>(t.scheduling_state),
|
||||
DBClientID::from_binary(t.scheduler_id),
|
||||
std::vector<ObjectID>());
|
||||
log_object_hash_mismatch_error_task_callback(task, user_context);
|
||||
Task_free(task);
|
||||
},
|
||||
|
@ -1532,7 +1533,7 @@ void process_message(event_loop *loop,
|
|||
ARROW_CHECK_OK(plasma::ReadStatusRequest(data, length, &object_id, 1));
|
||||
process_status_request(conn, object_id);
|
||||
} break;
|
||||
case DISCONNECT_CLIENT: {
|
||||
case static_cast<int64_t>(CommonMessageType::DISCONNECT_CLIENT): {
|
||||
RAY_LOG(DEBUG) << "Disconnecting client on fd " << client_sock;
|
||||
event_loop_remove_file(loop, client_sock);
|
||||
ClientConnection_free(conn);
|
||||
|
|
|
@ -134,7 +134,7 @@ template <class T>
|
|||
void ClientConnection<T>::ProcessMessageHeader(const boost::system::error_code &error) {
|
||||
if (error) {
|
||||
// If there was an error, disconnect the client.
|
||||
read_type_ = protocol::MessageType_DisconnectClient;
|
||||
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
|
||||
read_length_ = 0;
|
||||
ProcessMessage(error);
|
||||
return;
|
||||
|
@ -154,7 +154,7 @@ void ClientConnection<T>::ProcessMessageHeader(const boost::system::error_code &
|
|||
template <class T>
|
||||
void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error) {
|
||||
if (error) {
|
||||
read_type_ = protocol::MessageType_DisconnectClient;
|
||||
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
|
||||
}
|
||||
message_handler_(this->shared_from_this(), read_type_, read_message_.data());
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ add_custom_command(
|
|||
# flatbuffers message Message, which can be used to store deserialized
|
||||
# messages in data structures. This is currently used for ObjectInfo for
|
||||
# example.
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${GCS_FBS_SRC} --cpp --gen-object-api --gen-mutable
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${GCS_FBS_SRC} --cpp --gen-object-api --gen-mutable --scoped-enums
|
||||
DEPENDS ${FBS_DEPENDS}
|
||||
COMMENT "Running flatc compiler on ${GCS_FBS_SRC}"
|
||||
VERBATIM)
|
||||
|
|
|
@ -301,13 +301,13 @@ TEST_F(TestGcsWithAsio, TestLogAppendAt) {
|
|||
// Task table callbacks.
|
||||
void TaskAdded(gcs::AsyncGcsClient *client, const TaskID &id,
|
||||
const TaskTableDataT &data) {
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState::SCHEDULED);
|
||||
ASSERT_EQ(data.scheduler_id, kRandomId);
|
||||
}
|
||||
|
||||
void TaskLookupHelper(gcs::AsyncGcsClient *client, const TaskID &id,
|
||||
const TaskTableDataT &data, bool do_stop) {
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState_SCHEDULED);
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState::SCHEDULED);
|
||||
ASSERT_EQ(data.scheduler_id, kRandomId);
|
||||
if (do_stop) {
|
||||
test->Stop();
|
||||
|
@ -328,7 +328,7 @@ void TaskLookupFailure(gcs::AsyncGcsClient *client, const TaskID &id) {
|
|||
|
||||
void TaskLookupAfterUpdate(gcs::AsyncGcsClient *client, const TaskID &id,
|
||||
const TaskTableDataT &data) {
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState_LOST);
|
||||
ASSERT_EQ(data.scheduling_state, SchedulingState::LOST);
|
||||
test->Stop();
|
||||
}
|
||||
|
||||
|
@ -345,7 +345,7 @@ void TaskUpdateCallback(gcs::AsyncGcsClient *client, const TaskID &task_id,
|
|||
|
||||
void TestTaskTable(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> client) {
|
||||
auto data = std::make_shared<TaskTableDataT>();
|
||||
data->scheduling_state = SchedulingState_SCHEDULED;
|
||||
data->scheduling_state = SchedulingState::SCHEDULED;
|
||||
ClientID local_scheduler_id = ClientID::from_binary(kRandomId);
|
||||
data->scheduler_id = local_scheduler_id.binary();
|
||||
TaskID task_id = TaskID::from_random();
|
||||
|
@ -354,8 +354,8 @@ void TestTaskTable(const JobID &job_id, std::shared_ptr<gcs::AsyncGcsClient> cli
|
|||
client->task_table().Lookup(job_id, task_id, &TaskLookup, &TaskLookupFailure));
|
||||
auto update = std::make_shared<TaskTableTestAndUpdateT>();
|
||||
update->test_scheduler_id = local_scheduler_id.binary();
|
||||
update->test_state_bitmask = SchedulingState_SCHEDULED;
|
||||
update->update_state = SchedulingState_LOST;
|
||||
update->test_state_bitmask = SchedulingState::SCHEDULED;
|
||||
update->update_state = SchedulingState::LOST;
|
||||
// After test-and-setting, the callback will lookup the current state of the
|
||||
// task.
|
||||
RAY_CHECK_OK(
|
||||
|
|
|
@ -82,7 +82,7 @@ table TaskTableData {
|
|||
|
||||
table TaskTableTestAndUpdate {
|
||||
test_scheduler_id: string;
|
||||
test_state_bitmask: int;
|
||||
test_state_bitmask: SchedulingState;
|
||||
update_state: SchedulingState;
|
||||
}
|
||||
|
||||
|
|
|
@ -225,7 +225,7 @@ Status RedisContext::RunAsync(const std::string &command, const UniqueID &id,
|
|||
Status RedisContext::SubscribeAsync(const ClientID &client_id,
|
||||
const TablePubsub pubsub_channel,
|
||||
const RedisCallback &redisCallback) {
|
||||
RAY_CHECK(pubsub_channel != TablePubsub_NO_PUBLISH)
|
||||
RAY_CHECK(pubsub_channel != TablePubsub::NO_PUBLISH)
|
||||
<< "Client requested subscribe on a table that does not support pubsub";
|
||||
|
||||
int64_t callback_index = RedisCallbackManager::instance().add(redisCallback);
|
||||
|
|
|
@ -85,8 +85,8 @@ class Log : virtual public PubsubInterface<ID> {
|
|||
Log(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: context_(context),
|
||||
client_(client),
|
||||
pubsub_channel_(TablePubsub_NO_PUBLISH),
|
||||
prefix_(TablePrefix_UNUSED),
|
||||
pubsub_channel_(TablePubsub::NO_PUBLISH),
|
||||
prefix_(TablePrefix::UNUSED),
|
||||
subscribe_callback_index_(-1){};
|
||||
|
||||
/// Append a log entry to a key.
|
||||
|
@ -273,8 +273,8 @@ class ObjectTable : public Log<ObjectID, ObjectTableData> {
|
|||
public:
|
||||
ObjectTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Log(context, client) {
|
||||
pubsub_channel_ = TablePubsub_OBJECT;
|
||||
prefix_ = TablePrefix_OBJECT;
|
||||
pubsub_channel_ = TablePubsub::OBJECT;
|
||||
prefix_ = TablePrefix::OBJECT;
|
||||
};
|
||||
virtual ~ObjectTable(){};
|
||||
};
|
||||
|
@ -283,8 +283,8 @@ class HeartbeatTable : public Table<ClientID, HeartbeatTableData> {
|
|||
public:
|
||||
HeartbeatTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Table(context, client) {
|
||||
pubsub_channel_ = TablePubsub_HEARTBEAT;
|
||||
prefix_ = TablePrefix_HEARTBEAT;
|
||||
pubsub_channel_ = TablePubsub::HEARTBEAT;
|
||||
prefix_ = TablePrefix::HEARTBEAT;
|
||||
}
|
||||
virtual ~HeartbeatTable() {}
|
||||
};
|
||||
|
@ -293,8 +293,8 @@ class FunctionTable : public Table<ObjectID, FunctionTableData> {
|
|||
public:
|
||||
FunctionTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Table(context, client) {
|
||||
pubsub_channel_ = TablePubsub_NO_PUBLISH;
|
||||
prefix_ = TablePrefix_FUNCTION;
|
||||
pubsub_channel_ = TablePubsub::NO_PUBLISH;
|
||||
prefix_ = TablePrefix::FUNCTION;
|
||||
};
|
||||
};
|
||||
|
||||
|
@ -305,8 +305,8 @@ class ActorTable : public Log<ActorID, ActorTableData> {
|
|||
public:
|
||||
ActorTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Log(context, client) {
|
||||
pubsub_channel_ = TablePubsub_ACTOR;
|
||||
prefix_ = TablePrefix_ACTOR;
|
||||
pubsub_channel_ = TablePubsub::ACTOR;
|
||||
prefix_ = TablePrefix::ACTOR;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -315,7 +315,7 @@ class TaskReconstructionLog : public Log<TaskID, TaskReconstructionData> {
|
|||
TaskReconstructionLog(const std::shared_ptr<RedisContext> &context,
|
||||
AsyncGcsClient *client)
|
||||
: Log(context, client) {
|
||||
prefix_ = TablePrefix_TASK_RECONSTRUCTION;
|
||||
prefix_ = TablePrefix::TASK_RECONSTRUCTION;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -325,8 +325,8 @@ class TaskTable : public Table<TaskID, ray::protocol::Task> {
|
|||
public:
|
||||
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Table(context, client) {
|
||||
pubsub_channel_ = TablePubsub_RAYLET_TASK;
|
||||
prefix_ = TablePrefix_RAYLET_TASK;
|
||||
pubsub_channel_ = TablePubsub::RAYLET_TASK;
|
||||
prefix_ = TablePrefix::RAYLET_TASK;
|
||||
}
|
||||
|
||||
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client,
|
||||
|
@ -342,8 +342,8 @@ class TaskTable : public Table<TaskID, TaskTableData> {
|
|||
public:
|
||||
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client)
|
||||
: Table(context, client) {
|
||||
pubsub_channel_ = TablePubsub_TASK;
|
||||
prefix_ = TablePrefix_TASK;
|
||||
pubsub_channel_ = TablePubsub::TASK;
|
||||
prefix_ = TablePrefix::TASK;
|
||||
};
|
||||
|
||||
TaskTable(const std::shared_ptr<RedisContext> &context, AsyncGcsClient *client,
|
||||
|
@ -417,7 +417,8 @@ class TaskTable : public Table<TaskID, TaskTableData> {
|
|||
Status TaskTableAdd(AsyncGcsClient *gcs_client, Task *task);
|
||||
|
||||
Status TaskTableTestAndUpdate(AsyncGcsClient *gcs_client, const TaskID &task_id,
|
||||
const ClientID &local_scheduler_id, int test_state_bitmask,
|
||||
const ClientID &local_scheduler_id,
|
||||
SchedulingState test_state_bitmask,
|
||||
SchedulingState update_state,
|
||||
const TaskTable::TestAndUpdateCallback &callback);
|
||||
|
||||
|
@ -449,8 +450,8 @@ class ClientTable : private Log<UniqueID, ClientTableData> {
|
|||
disconnected_(false),
|
||||
client_id_(client_id),
|
||||
local_client_() {
|
||||
pubsub_channel_ = TablePubsub_CLIENT;
|
||||
prefix_ = TablePrefix_CLIENT;
|
||||
pubsub_channel_ = TablePubsub::CLIENT;
|
||||
prefix_ = TablePrefix::CLIENT;
|
||||
|
||||
// Set the local client's ID.
|
||||
local_client_.client_id = client_id.binary();
|
||||
|
|
|
@ -54,7 +54,8 @@ Status TaskTableAdd(AsyncGcsClient *gcs_client, Task *task) {
|
|||
// TODO(pcm): This is a helper method that should go away once we get rid of
|
||||
// the Task* datastructure and replace it with TaskTableDataT.
|
||||
Status TaskTableTestAndUpdate(AsyncGcsClient *gcs_client, const TaskID &task_id,
|
||||
const ClientID &local_scheduler_id, int test_state_bitmask,
|
||||
const ClientID &local_scheduler_id,
|
||||
SchedulingState test_state_bitmask,
|
||||
SchedulingState update_state,
|
||||
const TaskTable::TestAndUpdateCallback &callback) {
|
||||
auto data = std::make_shared<TaskTableTestAndUpdateT>();
|
||||
|
|
|
@ -12,7 +12,7 @@ add_custom_command(
|
|||
# flatbuffers message Message, which can be used to store deserialized
|
||||
# messages in data structures. This is currently used for ObjectInfo for
|
||||
# example.
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${OBJECT_MANAGER_FBS_SRC} --cpp --gen-object-api --gen-mutable
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${OBJECT_MANAGER_FBS_SRC} --cpp --gen-object-api --gen-mutable --scoped-enums
|
||||
DEPENDS ${FBS_DEPENDS}
|
||||
COMMENT "Running flatc compiler on ${OBJECT_MANAGER_FBS_SRC}"
|
||||
VERBATIM)
|
||||
|
|
|
@ -203,8 +203,9 @@ ray::Status ObjectManager::PullSendRequest(const ObjectID &object_id,
|
|||
auto message = object_manager_protocol::CreatePullRequestMessage(
|
||||
fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary()));
|
||||
fbb.Finish(message);
|
||||
RAY_CHECK_OK(conn->WriteMessage(object_manager_protocol::MessageType_PullRequest,
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
RAY_CHECK_OK(conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::PullRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
RAY_CHECK_OK(
|
||||
connection_pool_.ReleaseSender(ConnectionPool::ConnectionType::MESSAGE, conn));
|
||||
return ray::Status::OK();
|
||||
|
@ -318,9 +319,9 @@ ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
|
|||
auto message = object_manager_protocol::CreatePushRequestMessage(
|
||||
fbb, fbb.CreateString(object_id.binary()), chunk_index, data_size, metadata_size);
|
||||
fbb.Finish(message);
|
||||
ray::Status status =
|
||||
conn->WriteMessage(object_manager_protocol::MessageType_PushRequest, fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
ray::Status status = conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::PushRequest),
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
RAY_CHECK_OK(status);
|
||||
return SendObjectData(object_id, chunk_info, conn);
|
||||
}
|
||||
|
@ -528,8 +529,9 @@ std::shared_ptr<SenderConnection> ObjectManager::CreateSenderConnection(
|
|||
fbb, fbb.CreateString(client_id_.binary()), is_transfer);
|
||||
fbb.Finish(message);
|
||||
// Send synchronously.
|
||||
RAY_CHECK_OK(conn->WriteMessage(object_manager_protocol::MessageType_ConnectClient,
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
RAY_CHECK_OK(conn->WriteMessage(
|
||||
static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient),
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
// The connection is ready; return to caller.
|
||||
return conn;
|
||||
}
|
||||
|
@ -541,19 +543,19 @@ void ObjectManager::ProcessNewClient(TcpClientConnection &conn) {
|
|||
void ObjectManager::ProcessClientMessage(std::shared_ptr<TcpClientConnection> &conn,
|
||||
int64_t message_type, const uint8_t *message) {
|
||||
switch (message_type) {
|
||||
case object_manager_protocol::MessageType_PushRequest: {
|
||||
case static_cast<int64_t>(object_manager_protocol::MessageType::PushRequest): {
|
||||
ReceivePushRequest(conn, message);
|
||||
break;
|
||||
}
|
||||
case object_manager_protocol::MessageType_PullRequest: {
|
||||
case static_cast<int64_t>(object_manager_protocol::MessageType::PullRequest): {
|
||||
ReceivePullRequest(conn, message);
|
||||
break;
|
||||
}
|
||||
case object_manager_protocol::MessageType_ConnectClient: {
|
||||
case static_cast<int64_t>(object_manager_protocol::MessageType::ConnectClient): {
|
||||
ConnectClient(conn, message);
|
||||
break;
|
||||
}
|
||||
case protocol::MessageType_DisconnectClient: {
|
||||
case static_cast<int64_t>(protocol::MessageType::DisconnectClient): {
|
||||
// TODO(hme): Disconnect without depending on the node manager protocol.
|
||||
DisconnectClient(conn, message);
|
||||
break;
|
||||
|
|
|
@ -200,7 +200,7 @@ class TestObjectManagerBase : public ::testing::Test {
|
|||
|
||||
class StressTestObjectManager : public TestObjectManagerBase {
|
||||
public:
|
||||
enum TransferPattern {
|
||||
enum class TransferPattern {
|
||||
PUSH_A_B,
|
||||
PUSH_B_A,
|
||||
BIDIRECTIONAL_PUSH,
|
||||
|
@ -214,13 +214,13 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
uint num_expected_objects;
|
||||
|
||||
std::vector<TransferPattern> async_loop_patterns = {
|
||||
PUSH_A_B,
|
||||
PUSH_B_A,
|
||||
BIDIRECTIONAL_PUSH,
|
||||
PULL_A_B,
|
||||
PULL_B_A,
|
||||
BIDIRECTIONAL_PULL,
|
||||
BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE};
|
||||
TransferPattern::PUSH_A_B,
|
||||
TransferPattern::PUSH_B_A,
|
||||
TransferPattern::BIDIRECTIONAL_PUSH,
|
||||
TransferPattern::PULL_A_B,
|
||||
TransferPattern::PULL_B_A,
|
||||
TransferPattern::BIDIRECTIONAL_PULL,
|
||||
TransferPattern::BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE};
|
||||
|
||||
int num_connected_clients = 0;
|
||||
|
||||
|
@ -319,8 +319,9 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
|
||||
void TransferTestComplete() {
|
||||
int64_t elapsed = current_time_ms() - start_time;
|
||||
RAY_LOG(INFO) << "TransferTestComplete: " << async_loop_patterns[async_loop_index]
|
||||
<< " " << v1.size() << " " << elapsed;
|
||||
RAY_LOG(INFO) << "TransferTestComplete: "
|
||||
<< static_cast<int>(async_loop_patterns[async_loop_index]) << " "
|
||||
<< v1.size() << " " << elapsed;
|
||||
ASSERT_TRUE(v1.size() == v2.size());
|
||||
for (uint i = 0; i < v1.size(); ++i) {
|
||||
ASSERT_TRUE(std::find(v1.begin(), v1.end(), v2[i]) != v1.end());
|
||||
|
@ -347,9 +348,9 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
|
||||
ray::Status status = ray::Status::OK();
|
||||
|
||||
if (transfer_pattern == BIDIRECTIONAL_PULL ||
|
||||
transfer_pattern == BIDIRECTIONAL_PUSH ||
|
||||
transfer_pattern == BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE) {
|
||||
if (transfer_pattern == TransferPattern::BIDIRECTIONAL_PULL ||
|
||||
transfer_pattern == TransferPattern::BIDIRECTIONAL_PUSH ||
|
||||
transfer_pattern == TransferPattern::BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE) {
|
||||
num_expected_objects = (uint)2 * num_trials;
|
||||
} else {
|
||||
num_expected_objects = (uint)num_trials;
|
||||
|
@ -358,19 +359,19 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
start_time = current_time_ms();
|
||||
|
||||
switch (transfer_pattern) {
|
||||
case PUSH_A_B: {
|
||||
case TransferPattern::PUSH_A_B: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid1 = WriteDataToClient(client1, data_size);
|
||||
status = server1->object_manager_.Push(oid1, client_id_2);
|
||||
}
|
||||
} break;
|
||||
case PUSH_B_A: {
|
||||
case TransferPattern::PUSH_B_A: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid2 = WriteDataToClient(client2, data_size);
|
||||
status = server2->object_manager_.Push(oid2, client_id_1);
|
||||
}
|
||||
} break;
|
||||
case BIDIRECTIONAL_PUSH: {
|
||||
case TransferPattern::BIDIRECTIONAL_PUSH: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid1 = WriteDataToClient(client1, data_size);
|
||||
status = server1->object_manager_.Push(oid1, client_id_2);
|
||||
|
@ -378,19 +379,19 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
status = server2->object_manager_.Push(oid2, client_id_1);
|
||||
}
|
||||
} break;
|
||||
case PULL_A_B: {
|
||||
case TransferPattern::PULL_A_B: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid1 = WriteDataToClient(client1, data_size);
|
||||
status = server2->object_manager_.Pull(oid1);
|
||||
}
|
||||
} break;
|
||||
case PULL_B_A: {
|
||||
case TransferPattern::PULL_B_A: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid2 = WriteDataToClient(client2, data_size);
|
||||
status = server1->object_manager_.Pull(oid2);
|
||||
}
|
||||
} break;
|
||||
case BIDIRECTIONAL_PULL: {
|
||||
case TransferPattern::BIDIRECTIONAL_PULL: {
|
||||
for (int i = -1; ++i < num_trials;) {
|
||||
ObjectID oid1 = WriteDataToClient(client1, data_size);
|
||||
status = server2->object_manager_.Pull(oid1);
|
||||
|
@ -398,7 +399,7 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
status = server1->object_manager_.Pull(oid2);
|
||||
}
|
||||
} break;
|
||||
case BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE: {
|
||||
case TransferPattern::BIDIRECTIONAL_PULL_VARIABLE_DATA_SIZE: {
|
||||
std::random_device rd;
|
||||
std::mt19937 gen(rd());
|
||||
std::uniform_int_distribution<> dis(1, 50);
|
||||
|
@ -410,7 +411,8 @@ class StressTestObjectManager : public TestObjectManagerBase {
|
|||
}
|
||||
} break;
|
||||
default: {
|
||||
RAY_LOG(FATAL) << "No case for transfer_pattern " << transfer_pattern;
|
||||
RAY_LOG(FATAL) << "No case for transfer_pattern "
|
||||
<< static_cast<int>(transfer_pattern);
|
||||
} break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -289,7 +289,6 @@ class TestObjectManager : public TestObjectManagerBase {
|
|||
}
|
||||
|
||||
void TestWaitWhileSubscribed(UniqueID sub_id, ObjectID object_1, ObjectID object_2) {
|
||||
int num_objects = 2;
|
||||
int required_objects = 1;
|
||||
int timeout_ms = 1000;
|
||||
|
||||
|
@ -300,7 +299,7 @@ class TestObjectManager : public TestObjectManagerBase {
|
|||
|
||||
RAY_CHECK_OK(server1->object_manager_.AddWaitRequest(
|
||||
wait_id, object_ids, timeout_ms, required_objects, false,
|
||||
[this, sub_id, object_1, object_ids, num_objects, start_time](
|
||||
[this, sub_id, object_1, object_ids, start_time](
|
||||
const std::vector<ray::ObjectID> &found,
|
||||
const std::vector<ray::ObjectID> &remaining) {
|
||||
int64_t elapsed = (boost::posix_time::second_clock::local_time() - start_time)
|
||||
|
|
|
@ -12,7 +12,7 @@ add_custom_command(
|
|||
# flatbuffers message Message, which can be used to store deserialized
|
||||
# messages in data structures. This is currently used for ObjectInfo for
|
||||
# example.
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${NODE_MANAGER_FBS_SRC} --cpp --gen-object-api --gen-mutable
|
||||
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${NODE_MANAGER_FBS_SRC} --cpp --gen-object-api --gen-mutable --scoped-enums
|
||||
DEPENDS ${FBS_DEPENDS}
|
||||
COMMENT "Running flatc compiler on ${NODE_MANAGER_FBS_SRC}"
|
||||
VERBATIM)
|
||||
|
|
|
@ -48,7 +48,7 @@ Lineage::Lineage(const protocol::ForwardTaskRequest &task_request) {
|
|||
auto tasks = task_request.uncommitted_tasks();
|
||||
for (auto it = tasks->begin(); it != tasks->end(); it++) {
|
||||
auto task = Task(**it);
|
||||
LineageEntry entry(task, GcsStatus_UNCOMMITTED_REMOTE);
|
||||
LineageEntry entry(task, GcsStatus::UNCOMMITTED_REMOTE);
|
||||
RAY_CHECK(SetEntry(std::move(entry)));
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,7 @@ boost::optional<LineageEntry &> Lineage::GetEntryMutable(const UniqueID &task_id
|
|||
bool Lineage::SetEntry(LineageEntry &&new_entry) {
|
||||
// Get the status of the current entry at the key.
|
||||
auto task_id = new_entry.GetEntryId();
|
||||
GcsStatus current_status = GcsStatus_NONE;
|
||||
GcsStatus current_status = GcsStatus::NONE;
|
||||
auto current_entry = PopEntry(task_id);
|
||||
if (current_entry) {
|
||||
current_status = current_entry->GetStatus();
|
||||
|
@ -172,11 +172,11 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
|
|||
auto task_id = task.GetTaskSpecification().TaskId();
|
||||
// Merge the uncommitted lineage into the lineage cache.
|
||||
MergeLineageHelper(task_id, uncommitted_lineage, lineage_, [](GcsStatus status) {
|
||||
if (status != GcsStatus_NONE) {
|
||||
if (status != GcsStatus::NONE) {
|
||||
// We received the uncommitted lineage from a remote node, so make sure
|
||||
// that all entries in the lineage to merge have status
|
||||
// UNCOMMITTED_REMOTE.
|
||||
RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE);
|
||||
RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE);
|
||||
}
|
||||
// The only stopping condition is that an entry is not found.
|
||||
return false;
|
||||
|
@ -186,13 +186,13 @@ void LineageCache::AddWaitingTask(const Task &task, const Lineage &uncommitted_l
|
|||
// it. Unsubscribe since we are now responsible for committing the task.
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
if (entry) {
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE);
|
||||
UnsubscribeTask(task_id);
|
||||
}
|
||||
|
||||
// Add the submitted task to the lineage cache as UNCOMMITTED_WAITING. It
|
||||
// should be marked as UNCOMMITTED_READY once the task starts execution.
|
||||
LineageEntry task_entry(task, GcsStatus_UNCOMMITTED_WAITING);
|
||||
LineageEntry task_entry(task, GcsStatus::UNCOMMITTED_WAITING);
|
||||
RAY_CHECK(lineage_.SetEntry(std::move(task_entry)));
|
||||
}
|
||||
|
||||
|
@ -202,9 +202,9 @@ void LineageCache::AddReadyTask(const Task &task) {
|
|||
// Tasks can only become READY if they were in WAITING.
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
RAY_CHECK(entry);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_WAITING);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING);
|
||||
|
||||
auto new_entry = LineageEntry(task, GcsStatus_UNCOMMITTED_READY);
|
||||
auto new_entry = LineageEntry(task, GcsStatus::UNCOMMITTED_READY);
|
||||
RAY_CHECK(lineage_.SetEntry(std::move(new_entry)));
|
||||
// Attempt to flush the task.
|
||||
bool flushed = FlushTask(task_id);
|
||||
|
@ -234,11 +234,11 @@ void LineageCache::RemoveWaitingTask(const TaskID &task_id) {
|
|||
auto entry = lineage_.PopEntry(task_id);
|
||||
// It's only okay to remove a task that is waiting for execution.
|
||||
// TODO(swang): Is this necessarily true when there is reconstruction?
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_WAITING);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_WAITING);
|
||||
// Reset the status to REMOTE. We keep the task instead of removing it
|
||||
// completely in case another task is submitted locally that depends on this
|
||||
// one.
|
||||
entry->ResetStatus(GcsStatus_UNCOMMITTED_REMOTE);
|
||||
entry->ResetStatus(GcsStatus::UNCOMMITTED_REMOTE);
|
||||
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));
|
||||
|
||||
// Request a notification for every max_lineage_size_ tasks,
|
||||
|
@ -272,7 +272,7 @@ Lineage LineageCache::GetUncommittedLineage(const TaskID &task_id) const {
|
|||
bool LineageCache::FlushTask(const TaskID &task_id) {
|
||||
auto entry = lineage_.GetEntry(task_id);
|
||||
RAY_CHECK(entry);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus_UNCOMMITTED_READY);
|
||||
RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED_READY);
|
||||
|
||||
// Check if all arguments have been committed to the GCS before writing
|
||||
// this task.
|
||||
|
@ -283,14 +283,14 @@ bool LineageCache::FlushTask(const TaskID &task_id) {
|
|||
// committed yet, then as far as we know, it's still in flight to the
|
||||
// GCS. Skip this task for now.
|
||||
if (parent) {
|
||||
RAY_CHECK(parent->GetStatus() != GcsStatus_UNCOMMITTED_WAITING)
|
||||
RAY_CHECK(parent->GetStatus() != GcsStatus::UNCOMMITTED_WAITING)
|
||||
<< "Children should not become ready to flush before their parents.";
|
||||
// Request notifications about the parent entry's commit in the GCS if
|
||||
// the parent is remote. Otherwise, the parent is local and will
|
||||
// eventually be flushed. In either case, once we receive a
|
||||
// notification about the task's commit via HandleEntryCommitted, then
|
||||
// this task will be ready to write on the next call to Flush().
|
||||
if (parent->GetStatus() == GcsStatus_UNCOMMITTED_REMOTE) {
|
||||
if (parent->GetStatus() == GcsStatus::UNCOMMITTED_REMOTE) {
|
||||
SubscribeTask(parent_id);
|
||||
}
|
||||
all_arguments_committed = false;
|
||||
|
@ -319,7 +319,7 @@ bool LineageCache::FlushTask(const TaskID &task_id) {
|
|||
// We successfully wrote the task, so mark it as committing.
|
||||
// TODO(swang): Use a batched interface and write with all object entries.
|
||||
auto entry = lineage_.PopEntry(task_id);
|
||||
RAY_CHECK(entry->SetStatus(GcsStatus_COMMITTING));
|
||||
RAY_CHECK(entry->SetStatus(GcsStatus::COMMITTING));
|
||||
RAY_CHECK(lineage_.SetEntry(std::move(*entry)));
|
||||
}
|
||||
return all_arguments_committed;
|
||||
|
@ -375,7 +375,7 @@ void LineageCache::EvictRemoteLineage(const UniqueID &task_id) {
|
|||
// Tasks are committed in data dependency order per node, so the only
|
||||
// ancestors of a committed task should be other remote tasks.
|
||||
auto status = entry->GetStatus();
|
||||
RAY_CHECK(status == GcsStatus_UNCOMMITTED_REMOTE);
|
||||
RAY_CHECK(status == GcsStatus::UNCOMMITTED_REMOTE);
|
||||
// We are evicting the remote ancestors of a task, so there should not be
|
||||
// any dependent tasks that need to be flushed.
|
||||
RAY_CHECK(uncommitted_ready_children_.count(task_id) == 0);
|
||||
|
|
|
@ -16,19 +16,19 @@ namespace ray {
|
|||
namespace raylet {
|
||||
|
||||
/// The status of a lineage cache entry according to its status in the GCS.
|
||||
enum GcsStatus {
|
||||
enum class GcsStatus {
|
||||
/// The task is not in the lineage cache.
|
||||
GcsStatus_NONE = 0,
|
||||
NONE = 0,
|
||||
/// The task is being executed or created on a remote node.
|
||||
GcsStatus_UNCOMMITTED_REMOTE,
|
||||
UNCOMMITTED_REMOTE,
|
||||
/// The task is waiting to be executed or created locally.
|
||||
GcsStatus_UNCOMMITTED_WAITING,
|
||||
UNCOMMITTED_WAITING,
|
||||
/// The task has started execution, but the entry has not been written to the
|
||||
/// GCS yet.
|
||||
GcsStatus_UNCOMMITTED_READY,
|
||||
UNCOMMITTED_READY,
|
||||
/// The task has been written to the GCS and we are waiting for an
|
||||
/// acknowledgement of the commit.
|
||||
GcsStatus_COMMITTING,
|
||||
COMMITTING,
|
||||
};
|
||||
|
||||
/// \class LineageEntry
|
||||
|
|
|
@ -12,34 +12,34 @@ namespace local_scheduler_protocol = ray::local_scheduler::protocol;
|
|||
static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
|
||||
|
||||
// Check consistency between client and server protocol.
|
||||
RAY_CHECK_ENUM(protocol::MessageType_SubmitTask,
|
||||
local_scheduler_protocol::MessageType_SubmitTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_TaskDone,
|
||||
local_scheduler_protocol::MessageType_TaskDone);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_EventLogMessage,
|
||||
local_scheduler_protocol::MessageType_EventLogMessage);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_RegisterClientRequest,
|
||||
local_scheduler_protocol::MessageType_RegisterClientRequest);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_RegisterClientReply,
|
||||
local_scheduler_protocol::MessageType_RegisterClientReply);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_DisconnectClient,
|
||||
local_scheduler_protocol::MessageType_DisconnectClient);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_GetTask,
|
||||
local_scheduler_protocol::MessageType_GetTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_ExecuteTask,
|
||||
local_scheduler_protocol::MessageType_ExecuteTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_ReconstructObject,
|
||||
local_scheduler_protocol::MessageType_ReconstructObject);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_NotifyUnblocked,
|
||||
local_scheduler_protocol::MessageType_NotifyUnblocked);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_PutObject,
|
||||
local_scheduler_protocol::MessageType_PutObject);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierRequest,
|
||||
local_scheduler_protocol::MessageType_GetActorFrontierRequest);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_GetActorFrontierReply,
|
||||
local_scheduler_protocol::MessageType_GetActorFrontierReply);
|
||||
RAY_CHECK_ENUM(protocol::MessageType_SetActorFrontier,
|
||||
local_scheduler_protocol::MessageType_SetActorFrontier);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::SubmitTask,
|
||||
local_scheduler_protocol::MessageType::SubmitTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::TaskDone,
|
||||
local_scheduler_protocol::MessageType::TaskDone);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::EventLogMessage,
|
||||
local_scheduler_protocol::MessageType::EventLogMessage);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::RegisterClientRequest,
|
||||
local_scheduler_protocol::MessageType::RegisterClientRequest);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::RegisterClientReply,
|
||||
local_scheduler_protocol::MessageType::RegisterClientReply);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::DisconnectClient,
|
||||
local_scheduler_protocol::MessageType::DisconnectClient);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::GetTask,
|
||||
local_scheduler_protocol::MessageType::GetTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::ExecuteTask,
|
||||
local_scheduler_protocol::MessageType::ExecuteTask);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::ReconstructObject,
|
||||
local_scheduler_protocol::MessageType::ReconstructObject);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::NotifyUnblocked,
|
||||
local_scheduler_protocol::MessageType::NotifyUnblocked);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::PutObject,
|
||||
local_scheduler_protocol::MessageType::PutObject);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::GetActorFrontierRequest,
|
||||
local_scheduler_protocol::MessageType::GetActorFrontierRequest);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::GetActorFrontierReply,
|
||||
local_scheduler_protocol::MessageType::GetActorFrontierReply);
|
||||
RAY_CHECK_ENUM(protocol::MessageType::SetActorFrontier,
|
||||
local_scheduler_protocol::MessageType::SetActorFrontier);
|
||||
|
||||
/// A helper function to determine whether a given actor task has already been executed
|
||||
/// according to the given actor registry. Returns true if the task is a duplicate.
|
||||
|
@ -344,8 +344,8 @@ void NodeManager::ProcessClientMessage(
|
|||
const uint8_t *message_data) {
|
||||
RAY_LOG(DEBUG) << "Message of type " << message_type;
|
||||
|
||||
switch (message_type) {
|
||||
case protocol::MessageType_RegisterClientRequest: {
|
||||
switch (static_cast<protocol::MessageType>(message_type)) {
|
||||
case protocol::MessageType::RegisterClientRequest: {
|
||||
auto message = flatbuffers::GetRoot<protocol::RegisterClientRequest>(message_data);
|
||||
if (message->is_worker()) {
|
||||
// Create a new worker from the registration request.
|
||||
|
@ -354,7 +354,7 @@ void NodeManager::ProcessClientMessage(
|
|||
worker_pool_.RegisterWorker(std::move(worker));
|
||||
}
|
||||
} break;
|
||||
case protocol::MessageType_GetTask: {
|
||||
case protocol::MessageType::GetTask: {
|
||||
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
|
||||
RAY_CHECK(worker);
|
||||
// If the worker was assigned a task, mark it as finished.
|
||||
|
@ -367,7 +367,7 @@ void NodeManager::ProcessClientMessage(
|
|||
DispatchTasks();
|
||||
|
||||
} break;
|
||||
case protocol::MessageType_DisconnectClient: {
|
||||
case protocol::MessageType::DisconnectClient: {
|
||||
// Remove the dead worker from the pool and stop listening for messages.
|
||||
const std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
|
||||
if (worker) {
|
||||
|
@ -380,7 +380,7 @@ void NodeManager::ProcessClientMessage(
|
|||
}
|
||||
return;
|
||||
} break;
|
||||
case protocol::MessageType_SubmitTask: {
|
||||
case protocol::MessageType::SubmitTask: {
|
||||
// Read the task submitted by the client.
|
||||
auto message = flatbuffers::GetRoot<protocol::SubmitTaskRequest>(message_data);
|
||||
TaskExecutionSpecification task_execution_spec(
|
||||
|
@ -391,7 +391,7 @@ void NodeManager::ProcessClientMessage(
|
|||
// locally, there is no uncommitted lineage.
|
||||
SubmitTask(task, Lineage());
|
||||
} break;
|
||||
case protocol::MessageType_ReconstructObject: {
|
||||
case protocol::MessageType::ReconstructObject: {
|
||||
// TODO(hme): handle multiple object ids.
|
||||
auto message = flatbuffers::GetRoot<protocol::ReconstructObject>(message_data);
|
||||
ObjectID object_id = from_flatbuf(*message->object_id());
|
||||
|
@ -430,7 +430,7 @@ void NodeManager::ProcessClientMessage(
|
|||
DispatchTasks();
|
||||
}
|
||||
} break;
|
||||
case protocol::MessageType_NotifyUnblocked: {
|
||||
case protocol::MessageType::NotifyUnblocked: {
|
||||
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
|
||||
// Re-acquire the CPU resources for the task that was assigned to the
|
||||
// unblocked worker.
|
||||
|
@ -460,7 +460,7 @@ void NodeManager::ProcessClientMessage(
|
|||
worker->MarkUnblocked();
|
||||
}
|
||||
} break;
|
||||
case protocol::MessageType_WaitRequest: {
|
||||
case protocol::MessageType::WaitRequest: {
|
||||
// Read the data.
|
||||
auto message = flatbuffers::GetRoot<protocol::WaitRequest>(message_data);
|
||||
std::vector<ObjectID> object_ids = from_flatbuf(*message->object_ids());
|
||||
|
@ -470,14 +470,15 @@ void NodeManager::ProcessClientMessage(
|
|||
|
||||
ray::Status status = object_manager_.Wait(
|
||||
object_ids, wait_ms, num_required_objects, wait_local,
|
||||
[this, client](std::vector<ObjectID> found, std::vector<ObjectID> remaining) {
|
||||
[client](std::vector<ObjectID> found, std::vector<ObjectID> remaining) {
|
||||
// Write the data.
|
||||
flatbuffers::FlatBufferBuilder fbb;
|
||||
flatbuffers::Offset<protocol::WaitReply> wait_reply = protocol::CreateWaitReply(
|
||||
fbb, to_flatbuf(fbb, found), to_flatbuf(fbb, remaining));
|
||||
fbb.Finish(wait_reply);
|
||||
RAY_CHECK_OK(client->WriteMessage(protocol::MessageType_WaitReply,
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
RAY_CHECK_OK(
|
||||
client->WriteMessage(static_cast<int64_t>(protocol::MessageType::WaitReply),
|
||||
fbb.GetSize(), fbb.GetBufferPointer()));
|
||||
});
|
||||
RAY_CHECK_OK(status);
|
||||
} break;
|
||||
|
@ -497,8 +498,8 @@ void NodeManager::ProcessNewNodeManager(TcpClientConnection &node_manager_client
|
|||
void NodeManager::ProcessNodeManagerMessage(TcpClientConnection &node_manager_client,
|
||||
int64_t message_type,
|
||||
const uint8_t *message_data) {
|
||||
switch (message_type) {
|
||||
case protocol::MessageType_ForwardTaskRequest: {
|
||||
switch (static_cast<protocol::MessageType>(message_type)) {
|
||||
case protocol::MessageType::ForwardTaskRequest: {
|
||||
auto message = flatbuffers::GetRoot<protocol::ForwardTaskRequest>(message_data);
|
||||
TaskID task_id = from_flatbuf(*message->task_id());
|
||||
|
||||
|
@ -668,8 +669,9 @@ void NodeManager::AssignTask(Task &task) {
|
|||
auto message = protocol::CreateGetTaskReply(fbb, spec.ToFlatbuffer(fbb),
|
||||
fbb.CreateVector(std::vector<int>()));
|
||||
fbb.Finish(message);
|
||||
auto status = worker->Connection()->WriteMessage(protocol::MessageType_ExecuteTask,
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
auto status = worker->Connection()->WriteMessage(
|
||||
static_cast<int64_t>(protocol::MessageType::ExecuteTask), fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
if (status.ok()) {
|
||||
// Resource accounting: acquire resources for the assigned task.
|
||||
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
|
||||
|
@ -707,7 +709,8 @@ void NodeManager::AssignTask(Task &task) {
|
|||
} else {
|
||||
RAY_LOG(WARNING) << "Failed to send task to worker, disconnecting client";
|
||||
// We failed to send the task to the worker, so disconnect the worker.
|
||||
ProcessClientMessage(worker->Connection(), protocol::MessageType_DisconnectClient,
|
||||
ProcessClientMessage(worker->Connection(),
|
||||
static_cast<int64_t>(protocol::MessageType::DisconnectClient),
|
||||
NULL);
|
||||
// Queue this task for future assignment. The task will be assigned to a
|
||||
// worker once one becomes available.
|
||||
|
@ -829,8 +832,9 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
|
|||
}
|
||||
|
||||
auto &server_conn = it->second;
|
||||
auto status = server_conn.WriteMessage(protocol::MessageType_ForwardTaskRequest,
|
||||
fbb.GetSize(), fbb.GetBufferPointer());
|
||||
auto status = server_conn.WriteMessage(
|
||||
static_cast<int64_t>(protocol::MessageType::ForwardTaskRequest), fbb.GetSize(),
|
||||
fbb.GetBufferPointer());
|
||||
if (status.ok()) {
|
||||
// If we were able to forward the task, remove the forwarded task from the
|
||||
// lineage cache since the receiving node is now responsible for writing
|
||||
|
|
|
@ -148,13 +148,13 @@ SchedulingResources::~SchedulingResources() {}
|
|||
ResourceAvailabilityStatus SchedulingResources::CheckResourcesSatisfied(
|
||||
ResourceSet &resources) const {
|
||||
if (!resources.IsSubset(this->resources_total_)) {
|
||||
return kInfeasible;
|
||||
return ResourceAvailabilityStatus::kInfeasible;
|
||||
}
|
||||
// Resource demand specified is feasible. Check if it's available.
|
||||
if (!resources.IsSubset(this->resources_available_)) {
|
||||
return kResourcesUnavailable;
|
||||
return ResourceAvailabilityStatus::kResourcesUnavailable;
|
||||
}
|
||||
return kFeasible;
|
||||
return ResourceAvailabilityStatus::kFeasible;
|
||||
}
|
||||
|
||||
const ResourceSet &SchedulingResources::GetAvailableResources() const {
|
||||
|
|
|
@ -14,11 +14,11 @@ const std::string kCPU_ResourceLabel = "CPU";
|
|||
|
||||
/// Resource availability status reports whether the resource requirement is
|
||||
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
|
||||
typedef enum {
|
||||
enum class ResourceAvailabilityStatus : int {
|
||||
kInfeasible, ///< Cannot ever satisfy resource requirements.
|
||||
kResourcesUnavailable, ///< Feasible, but not currently available.
|
||||
kFeasible ///< Feasible and currently available.
|
||||
} ResourceAvailabilityStatus;
|
||||
};
|
||||
|
||||
/// \class ResourceSet
|
||||
/// \brief Encapsulates and operates on a set of resources, including CPUs,
|
||||
|
|
Loading…
Add table
Reference in a new issue