2016-10-04 12:55:10 -07:00
|
|
|
#include <inttypes.h>
|
2016-10-05 18:07:08 -07:00
|
|
|
#include <signal.h>
|
2016-10-04 12:55:10 -07:00
|
|
|
#include <stdio.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <sys/socket.h>
|
|
|
|
#include <sys/un.h>
|
2017-02-10 12:46:23 -08:00
|
|
|
#include <sys/wait.h>
|
2016-10-04 12:55:10 -07:00
|
|
|
#include <unistd.h>
|
|
|
|
|
|
|
|
#include "common.h"
|
2017-03-15 16:27:52 -07:00
|
|
|
#include "common_protocol.h"
|
2016-10-04 12:55:10 -07:00
|
|
|
#include "event_loop.h"
|
2017-03-15 16:27:52 -07:00
|
|
|
#include "format/local_scheduler_generated.h"
|
2016-10-04 12:55:10 -07:00
|
|
|
#include "io.h"
|
2017-01-05 16:47:16 -08:00
|
|
|
#include "logging.h"
|
2017-02-27 12:24:07 -08:00
|
|
|
#include "local_scheduler_shared.h"
|
|
|
|
#include "local_scheduler.h"
|
|
|
|
#include "local_scheduler_algorithm.h"
|
2017-02-15 00:10:05 -08:00
|
|
|
#include "state/actor_notification_table.h"
|
2016-10-04 12:55:10 -07:00
|
|
|
#include "state/db.h"
|
2016-11-10 18:13:26 -08:00
|
|
|
#include "state/task_table.h"
|
|
|
|
#include "state/object_table.h"
|
2017-03-21 00:16:48 -07:00
|
|
|
#include "state/error_table.h"
|
2016-10-04 12:55:10 -07:00
|
|
|
#include "utarray.h"
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
UT_icd task_ptr_icd = {sizeof(Task *), NULL, NULL, NULL};
|
2016-10-18 18:27:43 -07:00
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
UT_icd pid_t_icd = {sizeof(pid_t), NULL, NULL, NULL};
|
|
|
|
|
2016-11-04 00:41:20 -07:00
|
|
|
UT_icd byte_icd = {sizeof(uint8_t), NULL, NULL, NULL};
|
|
|
|
|
2017-02-09 01:34:14 -08:00
|
|
|
/**
|
|
|
|
* A helper function for printing available and requested resource information.
|
|
|
|
*
|
|
|
|
* @param state Local scheduler state.
|
|
|
|
* @param spec Task specification object.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2017-02-26 00:32:43 -08:00
|
|
|
void print_resource_info(const LocalSchedulerState *state,
|
2017-03-05 02:05:02 -08:00
|
|
|
const TaskSpec *spec) {
|
2017-02-09 01:34:14 -08:00
|
|
|
#if RAY_COMMON_LOG_LEVEL <= RAY_COMMON_DEBUG
|
|
|
|
/* Print information about available and requested resources. */
|
|
|
|
char buftotal[256], bufavail[256], bufresreq[256];
|
|
|
|
snprintf(bufavail, sizeof(bufavail), "%8.4f %8.4f",
|
2017-03-05 02:05:02 -08:00
|
|
|
state->dynamic_resources[ResourceIndex_CPU],
|
|
|
|
state->dynamic_resources[ResourceIndex_GPU]);
|
2017-02-09 01:34:14 -08:00
|
|
|
snprintf(buftotal, sizeof(buftotal), "%8.4f %8.4f",
|
2017-03-05 02:05:02 -08:00
|
|
|
state->static_resources[ResourceIndex_CPU],
|
|
|
|
state->static_resources[ResourceIndex_GPU]);
|
2017-02-09 01:34:14 -08:00
|
|
|
if (spec) {
|
|
|
|
snprintf(bufresreq, sizeof(bufresreq), "%8.4f %8.4f",
|
2017-03-05 02:05:02 -08:00
|
|
|
task_spec_get_required_resource(spec, ResourceIndex_CPU),
|
|
|
|
task_spec_get_required_resource(spec, ResourceIndex_GPU));
|
2017-02-09 01:34:14 -08:00
|
|
|
}
|
|
|
|
LOG_DEBUG("Resources: [total=%s][available=%s][requested=%s]", buftotal,
|
|
|
|
bufavail, spec ? bufresreq : "n/a");
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
2017-02-25 20:19:36 -08:00
|
|
|
int force_kill_worker(event_loop *loop, timer_id id, void *context) {
|
2017-02-26 00:32:43 -08:00
|
|
|
LocalSchedulerClient *worker = (LocalSchedulerClient *) context;
|
2017-02-25 20:19:36 -08:00
|
|
|
kill(worker->pid, SIGKILL);
|
|
|
|
close(worker->sock);
|
2017-04-10 21:02:36 -07:00
|
|
|
delete worker;
|
2017-02-25 20:19:36 -08:00
|
|
|
return EVENT_LOOP_TIMER_DONE;
|
|
|
|
}
|
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/**
|
|
|
|
* Kill a worker, if it is a child process, and clean up all of its associated
|
|
|
|
* state.
|
|
|
|
*
|
|
|
|
* @param worker A pointer to the worker we want to kill.
|
2017-02-25 20:19:36 -08:00
|
|
|
* @param cleanup A bool representing whether we're cleaning up the entire local
|
|
|
|
* scheduler's state, or just this worker. If true, then the worker will
|
|
|
|
* be force-killed immediately. Else, the worker will be given a chance
|
|
|
|
* to clean up its own state.
|
2017-02-10 12:46:23 -08:00
|
|
|
* @return Void.
|
|
|
|
*/
|
2017-04-10 21:02:36 -07:00
|
|
|
void kill_worker(LocalSchedulerState *state,
|
|
|
|
LocalSchedulerClient *worker,
|
|
|
|
bool cleanup) {
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Erase the local scheduler's reference to the worker. */
|
2017-04-10 21:02:36 -07:00
|
|
|
auto it = std::find(state->workers.begin(), state->workers.end(), worker);
|
|
|
|
CHECK(it != state->workers.end());
|
|
|
|
state->workers.erase(it);
|
|
|
|
|
|
|
|
/* Make sure that we removed the worker. */
|
|
|
|
it = std::find(state->workers.begin(), state->workers.end(), worker);
|
|
|
|
CHECK(it == state->workers.end());
|
|
|
|
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Erase the algorithm state's reference to the worker. */
|
|
|
|
handle_worker_removed(state, state->algorithm_state, worker);
|
2017-02-10 12:46:23 -08:00
|
|
|
|
|
|
|
/* Remove the client socket from the event loop so that we don't process the
|
|
|
|
* SIGPIPE when the worker is killed. */
|
2017-04-10 21:02:36 -07:00
|
|
|
event_loop_remove_file(state->loop, worker->sock);
|
2017-02-10 12:46:23 -08:00
|
|
|
|
|
|
|
/* If the worker has registered a process ID with us and it's a child
|
|
|
|
* process, use it to send a kill signal. */
|
2017-02-25 20:19:36 -08:00
|
|
|
bool free_worker = true;
|
2017-02-10 12:46:23 -08:00
|
|
|
if (worker->is_child && worker->pid != 0) {
|
2017-02-25 20:19:36 -08:00
|
|
|
if (cleanup) {
|
|
|
|
/* If we're exiting the local scheduler anyway, it's okay to force kill
|
|
|
|
* the worker immediately. Wait for the process to exit. */
|
|
|
|
kill(worker->pid, SIGKILL);
|
2017-02-10 12:46:23 -08:00
|
|
|
waitpid(worker->pid, NULL, 0);
|
2017-02-25 20:19:36 -08:00
|
|
|
close(worker->sock);
|
|
|
|
} else {
|
|
|
|
/* If we're just cleaning up a single worker, allow it some time to clean
|
|
|
|
* up its state before force killing. The client socket will be closed
|
|
|
|
* and the worker struct will be freed after the timeout. */
|
|
|
|
kill(worker->pid, SIGTERM);
|
|
|
|
event_loop_add_timer(state->loop, KILL_WORKER_TIMEOUT_MILLISECONDS,
|
|
|
|
force_kill_worker, (void *) worker);
|
|
|
|
free_worker = false;
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
2017-02-17 17:08:52 -08:00
|
|
|
LOG_INFO("Killed worker with pid %d", worker->pid);
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
|
|
|
|
2017-04-06 00:02:39 -07:00
|
|
|
/* If this worker is still running a task and we aren't cleaning up, push an
|
|
|
|
* error message to the driver responsible for the task. */
|
|
|
|
if (worker->task_in_progress != NULL && !cleanup) {
|
|
|
|
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
|
|
|
|
TaskID task_id = TaskSpec_task_id(spec);
|
|
|
|
push_error(state->db, TaskSpec_driver_id(spec), WORKER_DIED_ERROR_INDEX,
|
|
|
|
sizeof(task_id), task_id.id);
|
|
|
|
}
|
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Clean up the task in progress. */
|
|
|
|
if (worker->task_in_progress) {
|
2017-03-21 00:16:48 -07:00
|
|
|
if (!worker->is_blocked) {
|
|
|
|
/* Return the resources that the worker was using, if any. Blocked
|
|
|
|
* workers do not use resources. */
|
|
|
|
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
|
|
|
|
update_dynamic_resources(state, spec, true);
|
|
|
|
}
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Update the task table to reflect that the task failed to complete. */
|
|
|
|
if (state->db != NULL) {
|
2017-02-26 00:32:43 -08:00
|
|
|
Task_set_state(worker->task_in_progress, TASK_STATUS_LOST);
|
2017-02-25 20:19:36 -08:00
|
|
|
task_table_update(state->db, worker->task_in_progress, NULL, NULL, NULL);
|
|
|
|
} else {
|
2017-02-26 00:32:43 -08:00
|
|
|
Task_free(worker->task_in_progress);
|
2017-02-25 20:19:36 -08:00
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
LOG_DEBUG("Killed worker with pid %d", worker->pid);
|
2017-02-25 20:19:36 -08:00
|
|
|
if (free_worker) {
|
|
|
|
/* Clean up the client socket after killing the worker so that the worker
|
|
|
|
* can't receive the SIGPIPE before exiting. */
|
|
|
|
close(worker->sock);
|
2017-04-10 21:02:36 -07:00
|
|
|
delete worker;
|
2017-02-25 20:19:36 -08:00
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void LocalSchedulerState_free(LocalSchedulerState *state) {
|
2017-03-17 17:03:58 -07:00
|
|
|
/* Reset the SIGTERM handler to default behavior, so we try to clean up the
|
|
|
|
* local scheduler at most once. If a SIGTERM is caught afterwards, there is
|
|
|
|
* the possibility of orphan worker processes. */
|
|
|
|
signal(SIGTERM, SIG_DFL);
|
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Kill any child processes that didn't register as a worker yet. */
|
2017-04-10 21:02:36 -07:00
|
|
|
for (auto const &worker_pid : state->child_pids) {
|
|
|
|
kill(worker_pid, SIGKILL);
|
|
|
|
waitpid(worker_pid, NULL, 0);
|
|
|
|
LOG_INFO("Killed worker pid %d which hadn't started yet.", worker_pid);
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
|
|
|
|
2017-04-05 23:50:38 -07:00
|
|
|
/* Kill any registered workers. */
|
2017-02-25 20:19:36 -08:00
|
|
|
/* TODO(swang): It's possible that the local scheduler will exit before all
|
|
|
|
* of its task table updates make it to redis. */
|
2017-04-10 21:02:36 -07:00
|
|
|
while (state->workers.size() > 0) {
|
|
|
|
/* Note that kill_worker modifies the container state->workers, so it is
|
|
|
|
* important to do this loop in a way that does not use invalidated
|
|
|
|
* iterators. */
|
|
|
|
kill_worker(state, state->workers.back(), true);
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
2017-04-05 23:50:38 -07:00
|
|
|
|
|
|
|
/* Disconnect from plasma. */
|
|
|
|
plasma_disconnect(state->plasma_conn);
|
|
|
|
state->plasma_conn = NULL;
|
2017-02-10 12:46:23 -08:00
|
|
|
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Disconnect from the database. */
|
|
|
|
if (state->db != NULL) {
|
|
|
|
db_disconnect(state->db);
|
|
|
|
state->db = NULL;
|
|
|
|
}
|
2017-04-05 23:50:38 -07:00
|
|
|
|
|
|
|
/* Free the command for starting new workers. */
|
|
|
|
if (state->config.start_worker_command != NULL) {
|
|
|
|
int i = 0;
|
|
|
|
const char *arg = state->config.start_worker_command[i];
|
|
|
|
while (arg != NULL) {
|
|
|
|
free((void *) arg);
|
|
|
|
++i;
|
|
|
|
arg = state->config.start_worker_command[i];
|
|
|
|
}
|
|
|
|
free(state->config.start_worker_command);
|
|
|
|
state->config.start_worker_command = NULL;
|
|
|
|
}
|
|
|
|
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Free the mapping from the actor ID to the ID of the local scheduler
|
|
|
|
* responsible for that actor. */
|
|
|
|
actor_map_entry *current_actor_map_entry, *temp_actor_map_entry;
|
|
|
|
HASH_ITER(hh, state->actor_mapping, current_actor_map_entry,
|
|
|
|
temp_actor_map_entry) {
|
|
|
|
HASH_DEL(state->actor_mapping, current_actor_map_entry);
|
|
|
|
free(current_actor_map_entry);
|
|
|
|
}
|
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Free the algorithm state. */
|
2017-02-26 00:32:43 -08:00
|
|
|
SchedulingAlgorithmState_free(state->algorithm_state);
|
2017-02-10 12:46:23 -08:00
|
|
|
state->algorithm_state = NULL;
|
|
|
|
/* Free the input buffer. */
|
|
|
|
utarray_free(state->input_buffer);
|
|
|
|
state->input_buffer = NULL;
|
|
|
|
/* Destroy the event loop. */
|
|
|
|
event_loop_destroy(state->loop);
|
|
|
|
state->loop = NULL;
|
2017-04-10 21:02:36 -07:00
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Free the scheduler state. */
|
2017-04-10 21:02:36 -07:00
|
|
|
delete state;
|
2017-02-10 12:46:23 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Start a new worker as a child process.
|
|
|
|
*
|
|
|
|
* @param state The state of the local scheduler.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2017-02-26 00:32:43 -08:00
|
|
|
void start_worker(LocalSchedulerState *state, ActorID actor_id) {
|
2017-02-10 12:46:23 -08:00
|
|
|
/* We can't start a worker if we don't have the path to the worker script. */
|
2017-02-17 17:08:52 -08:00
|
|
|
if (state->config.start_worker_command == NULL) {
|
|
|
|
LOG_WARN("No valid command to start worker provided. Cannot start worker.");
|
|
|
|
return;
|
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Launch the process to create the worker. */
|
|
|
|
pid_t pid = fork();
|
|
|
|
if (pid != 0) {
|
2017-04-10 21:02:36 -07:00
|
|
|
state->child_pids.push_back(pid);
|
2017-02-17 17:08:52 -08:00
|
|
|
LOG_INFO("Started worker with pid %d", pid);
|
2017-02-10 12:46:23 -08:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2017-02-15 00:10:05 -08:00
|
|
|
char id_string[ID_STRING_SIZE];
|
2017-02-26 00:32:43 -08:00
|
|
|
ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE);
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Figure out how many arguments there are in the start_worker_command. */
|
|
|
|
int num_args = 0;
|
|
|
|
for (; state->config.start_worker_command[num_args] != NULL; ++num_args) {
|
|
|
|
}
|
|
|
|
const char **start_actor_worker_command =
|
2017-03-01 01:17:24 -08:00
|
|
|
(const char **) malloc((num_args + 3) * sizeof(const char *));
|
2017-02-15 00:10:05 -08:00
|
|
|
for (int i = 0; i < num_args; ++i) {
|
|
|
|
start_actor_worker_command[i] = state->config.start_worker_command[i];
|
|
|
|
}
|
|
|
|
start_actor_worker_command[num_args] = "--actor-id";
|
|
|
|
start_actor_worker_command[num_args + 1] = (const char *) id_string;
|
|
|
|
start_actor_worker_command[num_args + 2] = NULL;
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Try to execute the worker command. Exit if we're not successful. */
|
2017-02-15 00:10:05 -08:00
|
|
|
execvp(start_actor_worker_command[0],
|
|
|
|
(char *const *) start_actor_worker_command);
|
|
|
|
free(start_actor_worker_command);
|
2017-02-26 00:32:43 -08:00
|
|
|
LocalSchedulerState_free(state);
|
2017-02-10 12:46:23 -08:00
|
|
|
LOG_FATAL("Failed to start worker");
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Parse the command to start a worker. This takes in the command string,
|
|
|
|
* splits it into tokens on the space characters, and allocates an array of the
|
|
|
|
* tokens, terminated by a NULL pointer.
|
|
|
|
*
|
|
|
|
* @param command The command string to start a worker.
|
|
|
|
* @return A pointer to an array of strings, the tokens in the command string.
|
|
|
|
* The last element is a NULL pointer.
|
|
|
|
*/
|
|
|
|
const char **parse_command(const char *command) {
|
|
|
|
/* Count the number of tokens. */
|
|
|
|
char *command_copy = strdup(command);
|
|
|
|
const char *delimiter = " ";
|
|
|
|
char *token = NULL;
|
|
|
|
int num_args = 0;
|
|
|
|
token = strtok(command_copy, delimiter);
|
|
|
|
while (token != NULL) {
|
|
|
|
++num_args;
|
|
|
|
token = strtok(NULL, delimiter);
|
|
|
|
}
|
|
|
|
free(command_copy);
|
|
|
|
|
|
|
|
/* Allocate a NULL-terminated array for the tokens. */
|
2017-03-01 01:17:24 -08:00
|
|
|
const char **command_args =
|
|
|
|
(const char **) malloc((num_args + 1) * sizeof(const char *));
|
2017-02-10 12:46:23 -08:00
|
|
|
command_args[num_args] = NULL;
|
|
|
|
|
|
|
|
/* Fill in the token array. */
|
|
|
|
command_copy = strdup(command);
|
|
|
|
token = strtok(command_copy, delimiter);
|
|
|
|
int i = 0;
|
|
|
|
while (token != NULL) {
|
|
|
|
command_args[i] = strdup(token);
|
|
|
|
++i;
|
|
|
|
token = strtok(NULL, delimiter);
|
|
|
|
}
|
|
|
|
free(command_copy);
|
|
|
|
|
|
|
|
CHECK(num_args == i);
|
|
|
|
return command_args;
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
LocalSchedulerState *LocalSchedulerState_init(
|
2016-12-20 20:21:35 -08:00
|
|
|
const char *node_ip_address,
|
2016-12-04 17:08:16 -08:00
|
|
|
event_loop *loop,
|
|
|
|
const char *redis_addr,
|
|
|
|
int redis_port,
|
2016-12-21 18:53:12 -08:00
|
|
|
const char *local_scheduler_socket_name,
|
2016-12-04 17:08:16 -08:00
|
|
|
const char *plasma_store_socket_name,
|
2016-12-12 23:17:22 -08:00
|
|
|
const char *plasma_manager_socket_name,
|
2016-12-13 17:21:38 -08:00
|
|
|
const char *plasma_manager_address,
|
2017-01-27 01:28:48 -08:00
|
|
|
bool global_scheduler_exists,
|
2017-02-10 12:46:23 -08:00
|
|
|
const double static_resource_conf[],
|
2017-02-09 01:34:14 -08:00
|
|
|
const char *start_worker_command,
|
2017-02-10 12:46:23 -08:00
|
|
|
int num_workers) {
|
2017-04-10 21:02:36 -07:00
|
|
|
LocalSchedulerState *state = new LocalSchedulerState();
|
2017-01-27 01:28:48 -08:00
|
|
|
/* Set the configuration struct for the local scheduler. */
|
|
|
|
if (start_worker_command != NULL) {
|
2017-02-10 12:46:23 -08:00
|
|
|
state->config.start_worker_command = parse_command(start_worker_command);
|
2017-01-27 01:28:48 -08:00
|
|
|
} else {
|
|
|
|
state->config.start_worker_command = NULL;
|
|
|
|
}
|
2017-02-17 17:08:52 -08:00
|
|
|
if (start_worker_command == NULL) {
|
|
|
|
LOG_WARN(
|
|
|
|
"No valid command to start a worker provided, local scheduler will not "
|
|
|
|
"start any workers.");
|
|
|
|
}
|
2017-01-27 01:28:48 -08:00
|
|
|
state->config.global_scheduler_exists = global_scheduler_exists;
|
|
|
|
|
2016-10-05 18:07:08 -07:00
|
|
|
state->loop = loop;
|
2017-04-10 21:02:36 -07:00
|
|
|
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Initialize the hash table mapping actor ID to the ID of the local scheduler
|
|
|
|
* that is responsible for that actor. */
|
|
|
|
state->actor_mapping = NULL;
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Connect to Redis if a Redis address is provided. */
|
|
|
|
if (redis_addr != NULL) {
|
2016-12-21 18:53:12 -08:00
|
|
|
int num_args;
|
2016-12-20 20:21:35 -08:00
|
|
|
const char **db_connect_args = NULL;
|
2017-02-21 01:13:04 -08:00
|
|
|
/* Use UT_string to convert the resource value into a string. */
|
|
|
|
UT_string *num_cpus;
|
|
|
|
UT_string *num_gpus;
|
|
|
|
utstring_new(num_cpus);
|
|
|
|
utstring_new(num_gpus);
|
|
|
|
utstring_printf(num_cpus, "%f", static_resource_conf[0]);
|
|
|
|
utstring_printf(num_gpus, "%f", static_resource_conf[1]);
|
2016-12-20 20:21:35 -08:00
|
|
|
if (plasma_manager_address != NULL) {
|
2017-02-21 01:13:04 -08:00
|
|
|
num_args = 8;
|
2017-03-01 01:17:24 -08:00
|
|
|
db_connect_args = (const char **) malloc(sizeof(char *) * num_args);
|
2016-12-21 18:53:12 -08:00
|
|
|
db_connect_args[0] = "local_scheduler_socket_name";
|
|
|
|
db_connect_args[1] = local_scheduler_socket_name;
|
2017-02-21 01:13:04 -08:00
|
|
|
db_connect_args[2] = "num_cpus";
|
|
|
|
db_connect_args[3] = utstring_body(num_cpus);
|
|
|
|
db_connect_args[4] = "num_gpus";
|
|
|
|
db_connect_args[5] = utstring_body(num_gpus);
|
|
|
|
db_connect_args[6] = "aux_address";
|
|
|
|
db_connect_args[7] = plasma_manager_address;
|
2016-12-21 18:53:12 -08:00
|
|
|
} else {
|
2017-02-21 01:13:04 -08:00
|
|
|
num_args = 6;
|
2017-03-01 01:17:24 -08:00
|
|
|
db_connect_args = (const char **) malloc(sizeof(char *) * num_args);
|
2016-12-21 18:53:12 -08:00
|
|
|
db_connect_args[0] = "local_scheduler_socket_name";
|
|
|
|
db_connect_args[1] = local_scheduler_socket_name;
|
2017-02-21 01:13:04 -08:00
|
|
|
db_connect_args[2] = "num_cpus";
|
|
|
|
db_connect_args[3] = utstring_body(num_cpus);
|
|
|
|
db_connect_args[4] = "num_gpus";
|
|
|
|
db_connect_args[5] = utstring_body(num_gpus);
|
2016-12-20 20:21:35 -08:00
|
|
|
}
|
2017-02-27 12:24:07 -08:00
|
|
|
state->db = db_connect(redis_addr, redis_port, "local_scheduler",
|
|
|
|
node_ip_address, num_args, db_connect_args);
|
2017-02-21 01:13:04 -08:00
|
|
|
utstring_free(num_cpus);
|
|
|
|
utstring_free(num_gpus);
|
2016-12-21 18:53:12 -08:00
|
|
|
free(db_connect_args);
|
2016-12-05 00:26:53 -08:00
|
|
|
db_attach(state->db, loop, false);
|
2016-11-18 19:57:51 -08:00
|
|
|
} else {
|
2016-12-04 15:51:03 -08:00
|
|
|
state->db = NULL;
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
2016-12-27 19:51:26 -08:00
|
|
|
/* Connect to Plasma. This method will retry if Plasma hasn't started yet. */
|
2016-12-04 17:08:16 -08:00
|
|
|
state->plasma_conn =
|
|
|
|
plasma_connect(plasma_store_socket_name, plasma_manager_socket_name,
|
|
|
|
PLASMA_DEFAULT_RELEASE_DELAY);
|
|
|
|
/* Subscribe to notifications about sealed objects. */
|
|
|
|
int plasma_fd = plasma_subscribe(state->plasma_conn);
|
|
|
|
/* Add the callback that processes the notification to the event loop. */
|
|
|
|
event_loop_add_file(loop, plasma_fd, EVENT_LOOP_READ,
|
|
|
|
process_plasma_notification, state);
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Add scheduler state. */
|
2017-02-26 00:32:43 -08:00
|
|
|
state->algorithm_state = SchedulingAlgorithmState_init();
|
2017-02-05 14:52:28 -08:00
|
|
|
/* Add the input buffer. This is used to read in messages from clients without
|
|
|
|
* having to reallocate a new buffer every time. */
|
2016-11-04 00:41:20 -07:00
|
|
|
utarray_new(state->input_buffer, &byte_icd);
|
2017-02-09 01:34:14 -08:00
|
|
|
|
|
|
|
/* Initialize resource vectors. */
|
2017-03-05 02:05:02 -08:00
|
|
|
for (int i = 0; i < ResourceIndex_MAX; i++) {
|
2017-02-09 01:34:14 -08:00
|
|
|
state->static_resources[i] = state->dynamic_resources[i] =
|
|
|
|
static_resource_conf[i];
|
|
|
|
}
|
|
|
|
/* Print some debug information about resource configuration. */
|
|
|
|
print_resource_info(state, NULL);
|
2016-12-29 23:10:38 -08:00
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
/* Start the initial set of workers. */
|
|
|
|
for (int i = 0; i < num_workers; ++i) {
|
2017-02-15 00:10:05 -08:00
|
|
|
start_worker(state, NIL_ACTOR_ID);
|
2016-12-29 23:10:38 -08:00
|
|
|
}
|
|
|
|
|
2017-02-10 12:46:23 -08:00
|
|
|
return state;
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
2016-10-05 18:07:08 -07:00
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void update_dynamic_resources(LocalSchedulerState *state,
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec,
|
2017-02-17 17:08:52 -08:00
|
|
|
bool return_resources) {
|
2017-03-05 02:05:02 -08:00
|
|
|
for (int i = 0; i < ResourceIndex_MAX; ++i) {
|
|
|
|
double resource = TaskSpec_get_required_resource(spec, i);
|
2017-02-17 17:08:52 -08:00
|
|
|
if (!return_resources) {
|
|
|
|
/* If we are not returning resources, we are leasing them, so we want to
|
|
|
|
* subtract the resource quantities from our accounting. */
|
|
|
|
resource *= -1;
|
|
|
|
}
|
2017-03-21 00:16:48 -07:00
|
|
|
|
|
|
|
bool oversubscribed =
|
|
|
|
(!return_resources && state->dynamic_resources[i] < 0);
|
2017-02-17 17:08:52 -08:00
|
|
|
/* Add or subtract the task's resources from our count. */
|
|
|
|
state->dynamic_resources[i] += resource;
|
|
|
|
|
2017-03-21 00:16:48 -07:00
|
|
|
if ((!return_resources && state->dynamic_resources[i] < 0) &&
|
|
|
|
!oversubscribed) {
|
|
|
|
/* Log a warning if we are using more resources than we have been
|
|
|
|
* allocated, and we weren't already oversubscribed. */
|
2017-02-27 12:24:07 -08:00
|
|
|
LOG_WARN("local_scheduler dynamic resources dropped to %8.4f\t%8.4f\n",
|
2017-02-17 17:08:52 -08:00
|
|
|
state->dynamic_resources[0], state->dynamic_resources[1]);
|
|
|
|
}
|
|
|
|
CHECK(state->dynamic_resources[i] <= state->static_resources[i]);
|
|
|
|
}
|
|
|
|
print_resource_info(state, spec);
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void assign_task_to_worker(LocalSchedulerState *state,
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec,
|
|
|
|
int64_t task_spec_size,
|
2017-02-26 00:32:43 -08:00
|
|
|
LocalSchedulerClient *worker) {
|
2017-04-07 23:03:37 -07:00
|
|
|
/* Construct a flatbuffer object to send to the worker. */
|
|
|
|
flatbuffers::FlatBufferBuilder fbb;
|
|
|
|
auto message =
|
|
|
|
CreateGetTaskReply(fbb, fbb.CreateString((char *) spec, task_spec_size));
|
|
|
|
fbb.Finish(message);
|
|
|
|
|
|
|
|
if (write_message(worker->sock, MessageType_ExecuteTask, fbb.GetSize(),
|
|
|
|
(uint8_t *) fbb.GetBufferPointer()) < 0) {
|
2017-01-17 20:34:31 -08:00
|
|
|
if (errno == EPIPE || errno == EBADF) {
|
|
|
|
/* TODO(rkn): If this happens, the task should be added back to the task
|
|
|
|
* queue. */
|
|
|
|
LOG_WARN(
|
|
|
|
"Failed to give task to worker on fd %d. The client may have hung "
|
|
|
|
"up.",
|
2017-02-05 14:52:28 -08:00
|
|
|
worker->sock);
|
2017-01-17 20:34:31 -08:00
|
|
|
} else {
|
2017-02-05 14:52:28 -08:00
|
|
|
LOG_FATAL("Failed to give task to client on fd %d.", worker->sock);
|
2017-01-17 20:34:31 -08:00
|
|
|
}
|
|
|
|
}
|
2017-02-09 01:34:14 -08:00
|
|
|
|
|
|
|
/* Resource accounting:
|
|
|
|
* Update dynamic resource vector in the local scheduler state. */
|
2017-02-17 17:08:52 -08:00
|
|
|
update_dynamic_resources(state, spec, false);
|
2017-03-05 02:05:02 -08:00
|
|
|
Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_RUNNING,
|
2017-02-09 01:34:14 -08:00
|
|
|
state->db ? get_db_client_id(state->db) : NIL_ID);
|
|
|
|
/* Record which task this worker is executing. This will be freed in
|
2017-03-15 16:27:52 -07:00
|
|
|
* process_message when the worker sends a GetTask message to the local
|
2017-02-09 01:34:14 -08:00
|
|
|
* scheduler. */
|
2017-02-26 00:32:43 -08:00
|
|
|
worker->task_in_progress = Task_copy(task);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Update the global task table. */
|
2016-12-04 15:51:03 -08:00
|
|
|
if (state->db != NULL) {
|
2017-02-24 12:41:32 -08:00
|
|
|
task_table_update(state->db, task, NULL, NULL, NULL);
|
2017-02-09 01:34:14 -08:00
|
|
|
} else {
|
2017-02-26 00:32:43 -08:00
|
|
|
Task_free(task);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
2016-10-04 16:25:11 -07:00
|
|
|
}
|
2016-10-04 12:55:10 -07:00
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
void process_plasma_notification(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
void *context,
|
|
|
|
int events) {
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Read the notification from Plasma. */
|
2017-03-15 16:27:52 -07:00
|
|
|
uint8_t *notification = read_message_async(loop, client_sock);
|
|
|
|
if (!notification) {
|
2017-03-17 17:03:58 -07:00
|
|
|
/* The store has closed the socket. */
|
|
|
|
LocalSchedulerState_free(state);
|
|
|
|
LOG_FATAL(
|
|
|
|
"Lost connection to the plasma store, local scheduler is exiting!");
|
2016-12-02 18:51:50 -08:00
|
|
|
}
|
2017-03-15 16:27:52 -07:00
|
|
|
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
|
|
|
|
ObjectID object_id = from_flatbuf(object_info->object_id());
|
|
|
|
if (object_info->is_deletion()) {
|
|
|
|
handle_object_removed(state, object_id);
|
2016-12-19 23:18:57 -08:00
|
|
|
} else {
|
2017-03-15 16:27:52 -07:00
|
|
|
handle_object_available(state, state->algorithm_state, object_id);
|
2016-12-19 23:18:57 -08:00
|
|
|
}
|
2017-03-15 16:27:52 -07:00
|
|
|
free(notification);
|
2016-12-12 23:17:22 -08:00
|
|
|
}
|
|
|
|
|
2017-03-21 00:16:48 -07:00
|
|
|
void reconstruct_task_update_callback(Task *task,
|
|
|
|
void *user_context,
|
|
|
|
bool updated) {
|
|
|
|
/* The task ID should be in the task table. */
|
|
|
|
CHECK(task != NULL);
|
|
|
|
if (!updated) {
|
2017-02-01 19:18:46 -08:00
|
|
|
/* The test-and-set of the task's scheduling state failed, so the task was
|
|
|
|
* either not finished yet, or it was already being reconstructed.
|
|
|
|
* Suppress the reconstruction request. */
|
2016-12-12 23:17:22 -08:00
|
|
|
return;
|
|
|
|
}
|
2017-02-01 19:18:46 -08:00
|
|
|
/* Otherwise, the test-and-set succeeded, so resubmit the task for execution
|
|
|
|
* to ensure that reconstruction will happen. */
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec = Task_task_spec(task);
|
2017-02-15 00:10:05 -08:00
|
|
|
/* If the task is an actor task, then we currently do not reconstruct it.
|
|
|
|
* TODO(rkn): Handle this better. */
|
2017-03-05 02:05:02 -08:00
|
|
|
CHECK(ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID));
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Resubmit the task. */
|
2017-03-05 02:05:02 -08:00
|
|
|
handle_task_submitted(state, state->algorithm_state, spec,
|
|
|
|
Task_task_spec_size(task));
|
2017-02-01 19:18:46 -08:00
|
|
|
/* Recursively reconstruct the task's inputs, if necessary. */
|
2017-03-05 02:05:02 -08:00
|
|
|
for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) {
|
|
|
|
if (TaskSpec_arg_by_ref(spec, i)) {
|
|
|
|
ObjectID arg_id = TaskSpec_arg_id(spec, i);
|
2017-02-01 19:18:46 -08:00
|
|
|
reconstruct_object(state, arg_id);
|
|
|
|
}
|
2016-12-12 23:17:22 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-03-21 00:16:48 -07:00
|
|
|
void reconstruct_put_task_update_callback(Task *task,
|
|
|
|
void *user_context,
|
|
|
|
bool updated) {
|
|
|
|
CHECK(task != NULL);
|
|
|
|
if (updated) {
|
|
|
|
/* The update to TASK_STATUS_RECONSTRUCTING succeeded, so continue with
|
|
|
|
* reconstruction as usual. */
|
|
|
|
reconstruct_task_update_callback(task, user_context, updated);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
/* An object created by `ray.put` was not able to be reconstructed, and the
|
|
|
|
* workload will likely hang. Push an error to the appropriate driver. */
|
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
|
|
|
TaskSpec *spec = Task_task_spec(task);
|
|
|
|
FunctionID function = TaskSpec_function(spec);
|
|
|
|
push_error(state->db, TaskSpec_driver_id(spec),
|
|
|
|
PUT_RECONSTRUCTION_ERROR_INDEX, sizeof(function), function.id);
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void reconstruct_evicted_result_lookup_callback(ObjectID reconstruct_object_id,
|
|
|
|
TaskID task_id,
|
2017-03-21 00:16:48 -07:00
|
|
|
bool is_put,
|
2017-02-25 20:19:36 -08:00
|
|
|
void *user_context) {
|
2017-02-01 19:18:46 -08:00
|
|
|
CHECKM(!IS_NIL_ID(task_id),
|
|
|
|
"No task information found for object during reconstruction");
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
2017-03-21 00:16:48 -07:00
|
|
|
|
|
|
|
task_table_test_and_update_callback done_callback;
|
|
|
|
if (is_put) {
|
|
|
|
/* If the evicted object was created through ray.put and the originating
|
|
|
|
* task
|
|
|
|
* is still executing, it's very likely that the workload will hang and the
|
|
|
|
* worker needs to be restarted. Else, the reconstruction behavior is the
|
|
|
|
* same as for other evicted objects */
|
|
|
|
done_callback = reconstruct_put_task_update_callback;
|
|
|
|
} else {
|
|
|
|
done_callback = reconstruct_task_update_callback;
|
|
|
|
}
|
2017-02-25 20:19:36 -08:00
|
|
|
/* If there are no other instances of the task running, it's safe for us to
|
|
|
|
* claim responsibility for reconstruction. */
|
2017-03-21 00:16:48 -07:00
|
|
|
task_table_test_and_update(
|
|
|
|
state->db, task_id, (TASK_STATUS_DONE | TASK_STATUS_LOST),
|
|
|
|
TASK_STATUS_RECONSTRUCTING, NULL, done_callback, state);
|
2017-02-25 20:19:36 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void reconstruct_failed_result_lookup_callback(ObjectID reconstruct_object_id,
|
|
|
|
TaskID task_id,
|
2017-03-21 00:16:48 -07:00
|
|
|
bool is_put,
|
2017-02-25 20:19:36 -08:00
|
|
|
void *user_context) {
|
|
|
|
if (IS_NIL_ID(task_id)) {
|
|
|
|
/* NOTE(swang): For some reason, the result table update sometimes happens
|
|
|
|
* after this lookup returns, possibly due to concurrent clients. In most
|
|
|
|
* cases, this is okay because the initial execution is probably still
|
|
|
|
* pending, so for now, we log a warning and suppress reconstruction. */
|
|
|
|
LOG_WARN(
|
|
|
|
"No task information found for object during reconstruction (no object "
|
|
|
|
"entry yet)");
|
|
|
|
return;
|
|
|
|
}
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
2017-02-25 20:19:36 -08:00
|
|
|
/* If the task failed to finish, it's safe for us to claim responsibility for
|
|
|
|
* reconstruction. */
|
|
|
|
task_table_test_and_update(state->db, task_id, TASK_STATUS_LOST,
|
2017-02-24 12:41:32 -08:00
|
|
|
TASK_STATUS_RECONSTRUCTING, NULL,
|
|
|
|
reconstruct_task_update_callback, state);
|
2017-02-01 19:18:46 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void reconstruct_object_lookup_callback(ObjectID reconstruct_object_id,
|
2017-02-01 19:18:46 -08:00
|
|
|
int manager_count,
|
|
|
|
const char *manager_vector[],
|
|
|
|
void *user_context) {
|
|
|
|
LOG_DEBUG("Manager count was %d", manager_count);
|
2016-12-12 23:17:22 -08:00
|
|
|
/* Only continue reconstruction if we find that the object doesn't exist on
|
|
|
|
* any nodes. NOTE: This codepath is not responsible for checking if the
|
|
|
|
* object table entry is up-to-date. */
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Look up the task that created the object in the result table. */
|
2016-12-12 23:17:22 -08:00
|
|
|
if (manager_count == 0) {
|
2017-02-25 20:19:36 -08:00
|
|
|
/* If the object was created and later evicted, we reconstruct the object
|
|
|
|
* if and only if there are no other instances of the task running. */
|
|
|
|
result_table_lookup(state->db, reconstruct_object_id, NULL,
|
|
|
|
reconstruct_evicted_result_lookup_callback,
|
|
|
|
(void *) state);
|
|
|
|
} else if (manager_count == -1) {
|
|
|
|
/* If the object has not been created yet, we reconstruct the object if and
|
|
|
|
* only if the task that created the object failed to complete. */
|
2017-02-24 12:41:32 -08:00
|
|
|
result_table_lookup(state->db, reconstruct_object_id, NULL,
|
2017-02-25 20:19:36 -08:00
|
|
|
reconstruct_failed_result_lookup_callback,
|
|
|
|
(void *) state);
|
2016-12-12 23:17:22 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void reconstruct_object(LocalSchedulerState *state,
|
|
|
|
ObjectID reconstruct_object_id) {
|
2016-12-12 23:17:22 -08:00
|
|
|
LOG_DEBUG("Starting reconstruction");
|
|
|
|
/* TODO(swang): Track task lineage for puts. */
|
|
|
|
CHECK(state->db != NULL);
|
|
|
|
/* Determine if reconstruction is necessary by checking if the object exists
|
|
|
|
* on a node. */
|
2017-02-24 12:41:32 -08:00
|
|
|
object_table_lookup(state->db, reconstruct_object_id, NULL,
|
2017-02-01 19:18:46 -08:00
|
|
|
reconstruct_object_lookup_callback, (void *) state);
|
2016-10-04 16:25:11 -07:00
|
|
|
}
|
2016-10-04 12:55:10 -07:00
|
|
|
|
2017-04-07 23:03:37 -07:00
|
|
|
void send_client_register_reply(LocalSchedulerState *state,
|
|
|
|
LocalSchedulerClient *worker) {
|
|
|
|
flatbuffers::FlatBufferBuilder fbb;
|
|
|
|
auto message = CreateRegisterClientReply(fbb);
|
|
|
|
fbb.Finish(message);
|
|
|
|
|
|
|
|
/* Send the message to the client. */
|
|
|
|
if (write_message(worker->sock, MessageType_RegisterClientReply,
|
|
|
|
fbb.GetSize(), fbb.GetBufferPointer()) < 0) {
|
|
|
|
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
|
|
|
|
/* Something went wrong, so kill the worker. */
|
2017-04-10 21:02:36 -07:00
|
|
|
kill_worker(state, worker, false);
|
2017-04-07 23:03:37 -07:00
|
|
|
LOG_WARN(
|
|
|
|
"Failed to give send register client reply to worker on fd %d. The "
|
|
|
|
"client may have hung up.",
|
|
|
|
worker->sock);
|
|
|
|
} else {
|
|
|
|
LOG_FATAL("Failed to send register client reply to client on fd %d.",
|
|
|
|
worker->sock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void handle_client_register(LocalSchedulerState *state,
|
|
|
|
LocalSchedulerClient *worker,
|
|
|
|
const RegisterClientRequest *message) {
|
|
|
|
/* Register the worker or driver. */
|
|
|
|
if (message->is_worker()) {
|
|
|
|
/* Update the actor mapping with the actor ID of the worker (if an actor is
|
|
|
|
* running on the worker). */
|
2017-04-10 21:02:36 -07:00
|
|
|
worker->pid = message->worker_pid();
|
2017-04-07 23:03:37 -07:00
|
|
|
ActorID actor_id = from_flatbuf(message->actor_id());
|
|
|
|
if (!ActorID_equal(actor_id, NIL_ACTOR_ID)) {
|
|
|
|
/* Make sure that the local scheduler is aware that it is responsible for
|
|
|
|
* this actor. */
|
|
|
|
actor_map_entry *entry;
|
|
|
|
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
|
|
|
|
CHECK(entry != NULL);
|
|
|
|
CHECK(DBClientID_equal(entry->local_scheduler_id,
|
|
|
|
get_db_client_id(state->db)));
|
|
|
|
/* Update the worker struct with this actor ID. */
|
|
|
|
CHECK(ActorID_equal(worker->actor_id, NIL_ACTOR_ID));
|
|
|
|
worker->actor_id = actor_id;
|
|
|
|
/* Let the scheduling algorithm process the presence of this new
|
|
|
|
* worker. */
|
|
|
|
handle_actor_worker_connect(state, state->algorithm_state, actor_id,
|
|
|
|
worker);
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Register worker process id with the scheduler. */
|
|
|
|
/* Determine if this worker is one of our child processes. */
|
2017-04-10 21:02:36 -07:00
|
|
|
LOG_DEBUG("PID is %d", worker->pid);
|
|
|
|
auto it = std::find(state->child_pids.begin(), state->child_pids.end(),
|
|
|
|
worker->pid);
|
|
|
|
if (it != state->child_pids.end()) {
|
|
|
|
/* If this worker is one of our child processes, mark it as a child so
|
|
|
|
* that we know that we can wait for the process to exit during
|
|
|
|
* cleanup. */
|
|
|
|
worker->is_child = true;
|
|
|
|
state->child_pids.erase(it);
|
|
|
|
LOG_DEBUG("Found matching child pid %d", worker->pid);
|
2017-04-07 23:03:37 -07:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
/* Register the driver. Currently we don't do anything here. */
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-10 21:02:36 -07:00
|
|
|
/* End of the cleanup code. */
|
|
|
|
|
2016-12-08 19:14:10 -08:00
|
|
|
void process_message(event_loop *loop,
|
|
|
|
int client_sock,
|
|
|
|
void *context,
|
2016-10-04 12:55:10 -07:00
|
|
|
int events) {
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerClient *worker = (LocalSchedulerClient *) context;
|
2017-02-26 00:32:43 -08:00
|
|
|
LocalSchedulerState *state = worker->local_scheduler_state;
|
2016-10-04 12:55:10 -07:00
|
|
|
|
|
|
|
int64_t type;
|
2017-01-05 16:47:16 -08:00
|
|
|
int64_t length = read_buffer(client_sock, &type, state->input_buffer);
|
2016-10-04 12:55:10 -07:00
|
|
|
|
2016-10-18 18:27:43 -07:00
|
|
|
LOG_DEBUG("New event of type %" PRId64, type);
|
|
|
|
|
2016-10-04 12:55:10 -07:00
|
|
|
switch (type) {
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_SubmitTask: {
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec = (TaskSpec *) utarray_front(state->input_buffer);
|
2017-02-25 20:19:36 -08:00
|
|
|
/* Update the result table, which holds mappings of object ID -> ID of the
|
|
|
|
* task that created it. */
|
|
|
|
if (state->db != NULL) {
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskID task_id = TaskSpec_task_id(spec);
|
|
|
|
for (int64_t i = 0; i < TaskSpec_num_returns(spec); ++i) {
|
|
|
|
ObjectID return_id = TaskSpec_return(spec, i);
|
2017-03-21 00:16:48 -07:00
|
|
|
result_table_add(state->db, return_id, task_id, false, NULL, NULL,
|
|
|
|
NULL);
|
2017-02-25 20:19:36 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* Handle the task submission. */
|
2017-03-05 02:05:02 -08:00
|
|
|
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
|
|
|
|
handle_task_submitted(state, state->algorithm_state, spec, length);
|
2017-02-15 00:10:05 -08:00
|
|
|
} else {
|
2017-03-05 02:05:02 -08:00
|
|
|
handle_actor_task_submitted(state, state->algorithm_state, spec, length);
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
|
|
|
|
2016-10-04 12:55:10 -07:00
|
|
|
} break;
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_TaskDone: {
|
2016-10-04 12:55:10 -07:00
|
|
|
} break;
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_EventLogMessage: {
|
|
|
|
/* Parse the message. */
|
|
|
|
auto message = flatbuffers::GetRoot<EventLogMessage>(
|
|
|
|
utarray_front(state->input_buffer));
|
2017-01-05 16:47:16 -08:00
|
|
|
if (state->db != NULL) {
|
2017-03-15 16:27:52 -07:00
|
|
|
RayLogger_log_event(
|
|
|
|
state->db, (uint8_t *) message->key()->data(), message->key()->size(),
|
|
|
|
(uint8_t *) message->value()->data(), message->value()->size());
|
2017-01-05 16:47:16 -08:00
|
|
|
}
|
|
|
|
} break;
|
2017-04-07 23:03:37 -07:00
|
|
|
case MessageType_RegisterClientRequest: {
|
|
|
|
auto message = flatbuffers::GetRoot<RegisterClientRequest>(
|
2017-03-15 16:27:52 -07:00
|
|
|
utarray_front(state->input_buffer));
|
2017-04-07 23:03:37 -07:00
|
|
|
handle_client_register(state, worker, message);
|
|
|
|
send_client_register_reply(state, worker);
|
2017-02-15 00:10:05 -08:00
|
|
|
} break;
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_GetTask: {
|
2017-02-09 01:34:14 -08:00
|
|
|
/* If this worker reports a completed task: account for resources. */
|
|
|
|
if (worker->task_in_progress != NULL) {
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec = Task_task_spec(worker->task_in_progress);
|
2017-02-09 01:34:14 -08:00
|
|
|
/* Return dynamic resources back for the task in progress. */
|
2017-02-17 17:08:52 -08:00
|
|
|
update_dynamic_resources(state, spec, true);
|
2017-02-09 01:34:14 -08:00
|
|
|
/* If we're connected to Redis, update tables. */
|
|
|
|
if (state->db != NULL) {
|
|
|
|
/* Update control state tables. */
|
2017-02-26 00:32:43 -08:00
|
|
|
Task_set_state(worker->task_in_progress, TASK_STATUS_DONE);
|
2017-02-24 12:41:32 -08:00
|
|
|
task_table_update(state->db, worker->task_in_progress, NULL, NULL,
|
|
|
|
NULL);
|
2017-02-09 01:34:14 -08:00
|
|
|
/* The call to task_table_update takes ownership of the
|
|
|
|
* task_in_progress, so we set the pointer to NULL so it is not used. */
|
|
|
|
} else {
|
2017-02-26 00:32:43 -08:00
|
|
|
Task_free(worker->task_in_progress);
|
2017-02-09 01:34:14 -08:00
|
|
|
}
|
2017-02-05 14:52:28 -08:00
|
|
|
worker->task_in_progress = NULL;
|
2016-12-20 00:13:39 -08:00
|
|
|
}
|
|
|
|
/* Let the scheduling algorithm process the fact that there is an available
|
|
|
|
* worker. */
|
2017-02-26 00:32:43 -08:00
|
|
|
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
|
2017-02-15 00:10:05 -08:00
|
|
|
handle_worker_available(state, state->algorithm_state, worker);
|
|
|
|
} else {
|
|
|
|
handle_actor_worker_available(state, state->algorithm_state, worker);
|
|
|
|
}
|
2016-12-12 23:17:22 -08:00
|
|
|
} break;
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_ReconstructObject: {
|
|
|
|
auto message = flatbuffers::GetRoot<ReconstructObject>(
|
|
|
|
utarray_front(state->input_buffer));
|
2017-02-17 17:08:52 -08:00
|
|
|
if (worker->task_in_progress != NULL && !worker->is_blocked) {
|
|
|
|
/* TODO(swang): For now, we don't handle blocked actors. */
|
2017-02-26 00:32:43 -08:00
|
|
|
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
|
2017-02-17 17:08:52 -08:00
|
|
|
/* If the worker was executing a task (i.e. non-driver) and it wasn't
|
|
|
|
* already blocked on an object that's not locally available, update its
|
|
|
|
* state to blocked. */
|
|
|
|
handle_worker_blocked(state, state->algorithm_state, worker);
|
|
|
|
print_worker_info("Reconstructing", state->algorithm_state);
|
|
|
|
}
|
|
|
|
}
|
2017-03-15 16:27:52 -07:00
|
|
|
reconstruct_object(state, from_flatbuf(message->object_id()));
|
2016-10-04 12:55:10 -07:00
|
|
|
} break;
|
|
|
|
case DISCONNECT_CLIENT: {
|
|
|
|
LOG_INFO("Disconnecting client on fd %d", client_sock);
|
2017-04-10 21:02:36 -07:00
|
|
|
kill_worker(state, worker, false);
|
2017-02-26 00:32:43 -08:00
|
|
|
if (!ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Let the scheduling algorithm process the absence of this worker. */
|
|
|
|
handle_actor_worker_disconnect(state, state->algorithm_state,
|
|
|
|
worker->actor_id);
|
|
|
|
}
|
2016-10-04 12:55:10 -07:00
|
|
|
} break;
|
2017-03-15 16:27:52 -07:00
|
|
|
case MessageType_NotifyUnblocked: {
|
2017-02-17 17:08:52 -08:00
|
|
|
if (worker->task_in_progress != NULL) {
|
|
|
|
/* TODO(swang): For now, we don't handle blocked actors. */
|
2017-02-26 00:32:43 -08:00
|
|
|
if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) {
|
2017-02-17 17:08:52 -08:00
|
|
|
/* If the worker was executing a task (i.e. non-driver), update its
|
|
|
|
* state to not blocked. */
|
|
|
|
CHECK(worker->is_blocked);
|
|
|
|
handle_worker_unblocked(state, state->algorithm_state, worker);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
print_worker_info("Worker unblocked", state->algorithm_state);
|
|
|
|
} break;
|
2017-03-21 00:16:48 -07:00
|
|
|
case MessageType_PutObject: {
|
|
|
|
auto message =
|
|
|
|
flatbuffers::GetRoot<PutObject>(utarray_front(state->input_buffer));
|
|
|
|
result_table_add(state->db, from_flatbuf(message->object_id()),
|
|
|
|
from_flatbuf(message->task_id()), true, NULL, NULL, NULL);
|
|
|
|
} break;
|
2016-10-04 12:55:10 -07:00
|
|
|
default:
|
|
|
|
/* This code should be unreachable. */
|
|
|
|
CHECK(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-12-08 19:14:10 -08:00
|
|
|
void new_client_connection(event_loop *loop,
|
|
|
|
int listener_sock,
|
|
|
|
void *context,
|
2016-10-04 12:55:10 -07:00
|
|
|
int events) {
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
2016-10-04 12:55:10 -07:00
|
|
|
int new_socket = accept_client(listener_sock);
|
2017-02-05 14:52:28 -08:00
|
|
|
/* Create a struct for this worker. This will be freed when we free the local
|
|
|
|
* scheduler state. */
|
2017-04-10 21:02:36 -07:00
|
|
|
LocalSchedulerClient *worker = new LocalSchedulerClient();
|
2017-02-05 14:52:28 -08:00
|
|
|
worker->sock = new_socket;
|
|
|
|
worker->task_in_progress = NULL;
|
2017-02-17 17:08:52 -08:00
|
|
|
worker->is_blocked = false;
|
2017-02-10 12:46:23 -08:00
|
|
|
worker->pid = 0;
|
|
|
|
worker->is_child = false;
|
2017-02-15 00:10:05 -08:00
|
|
|
worker->actor_id = NIL_ACTOR_ID;
|
2017-02-05 14:52:28 -08:00
|
|
|
worker->local_scheduler_state = state;
|
2017-04-10 21:02:36 -07:00
|
|
|
state->workers.push_back(worker);
|
2016-12-12 23:17:22 -08:00
|
|
|
event_loop_add_file(loop, new_socket, EVENT_LOOP_READ, process_message,
|
2017-02-05 14:52:28 -08:00
|
|
|
worker);
|
2016-11-02 00:39:35 -07:00
|
|
|
LOG_DEBUG("new connection with fd %d", new_socket);
|
2016-10-04 12:55:10 -07:00
|
|
|
}
|
|
|
|
|
2016-10-05 18:07:08 -07:00
|
|
|
/* We need this code so we can clean up when we get a SIGTERM signal. */
|
|
|
|
|
2017-04-05 23:50:38 -07:00
|
|
|
LocalSchedulerState *g_state = NULL;
|
2016-10-05 18:07:08 -07:00
|
|
|
|
|
|
|
void signal_handler(int signal) {
|
2017-02-10 12:46:23 -08:00
|
|
|
LOG_DEBUG("Signal was %d", signal);
|
2016-10-05 18:07:08 -07:00
|
|
|
if (signal == SIGTERM) {
|
2017-03-17 17:03:58 -07:00
|
|
|
/* NOTE(swang): This call removes the SIGTERM handler to ensure that we
|
|
|
|
* free the local scheduler state at most once. If another SIGTERM is
|
|
|
|
* caught during this call, there is the possibility of orphan worker
|
|
|
|
* processes. */
|
2017-04-05 23:50:38 -07:00
|
|
|
if (g_state) {
|
|
|
|
LocalSchedulerState_free(g_state);
|
|
|
|
}
|
2016-10-05 18:07:08 -07:00
|
|
|
exit(0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-10 21:02:36 -07:00
|
|
|
void handle_task_scheduled_callback(Task *original_task,
|
|
|
|
void *subscribe_context) {
|
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) subscribe_context;
|
2017-03-05 02:05:02 -08:00
|
|
|
TaskSpec *spec = Task_task_spec(original_task);
|
|
|
|
if (ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) {
|
2017-02-15 00:10:05 -08:00
|
|
|
/* This task does not involve an actor. Handle it normally. */
|
2017-04-10 21:02:36 -07:00
|
|
|
handle_task_scheduled(state, state->algorithm_state, spec,
|
2017-03-05 02:05:02 -08:00
|
|
|
Task_task_spec_size(original_task));
|
2017-02-15 00:10:05 -08:00
|
|
|
} else {
|
|
|
|
/* This task involves an actor. Call the scheduling algorithm's actor
|
|
|
|
* handler. */
|
2017-04-10 21:02:36 -07:00
|
|
|
handle_actor_task_scheduled(state, state->algorithm_state, spec,
|
2017-03-05 02:05:02 -08:00
|
|
|
Task_task_spec_size(original_task));
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Process a notification about the creation of a new actor. Use this to update
|
|
|
|
* the mapping from actor ID to the local scheduler ID of the local scheduler
|
|
|
|
* that is responsible for the actor. If this local scheduler is responsible for
|
|
|
|
* the actor, then launch a new worker process to create that actor.
|
|
|
|
*
|
|
|
|
* @param actor_id The ID of the actor being created.
|
|
|
|
* @param local_scheduler_id The ID of the local scheduler that is responsible
|
|
|
|
* for creating the actor.
|
|
|
|
* @return Void.
|
|
|
|
*/
|
2017-02-26 00:32:43 -08:00
|
|
|
void handle_actor_creation_callback(ActorInfo info, void *context) {
|
|
|
|
ActorID actor_id = info.actor_id;
|
|
|
|
DBClientID local_scheduler_id = info.local_scheduler_id;
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Make sure the actor entry is not already present in the actor map table.
|
|
|
|
* TODO(rkn): We will need to remove this check to handle the case where the
|
|
|
|
* corresponding publish is retried and the case in which a task that creates
|
|
|
|
* an actor is resubmitted due to fault tolerance. */
|
|
|
|
actor_map_entry *entry;
|
|
|
|
HASH_FIND(hh, state->actor_mapping, &actor_id, sizeof(actor_id), entry);
|
|
|
|
CHECK(entry == NULL);
|
|
|
|
/* Create a new entry and add it to the actor mapping table. TODO(rkn):
|
|
|
|
* Currently this is never removed (except when the local scheduler state is
|
|
|
|
* deleted). */
|
2017-03-01 01:17:24 -08:00
|
|
|
entry = (actor_map_entry *) malloc(sizeof(actor_map_entry));
|
2017-02-15 00:10:05 -08:00
|
|
|
entry->actor_id = actor_id;
|
|
|
|
entry->local_scheduler_id = local_scheduler_id;
|
|
|
|
HASH_ADD(hh, state->actor_mapping, actor_id, sizeof(entry->actor_id), entry);
|
|
|
|
/* If this local scheduler is responsible for the actor, then start a new
|
|
|
|
* worker for the actor. */
|
2017-02-26 00:32:43 -08:00
|
|
|
if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) {
|
2017-02-15 00:10:05 -08:00
|
|
|
start_worker(state, actor_id);
|
|
|
|
}
|
|
|
|
/* Let the scheduling algorithm process the fact that a new actor has been
|
|
|
|
* created. */
|
|
|
|
handle_actor_creation_notification(state, state->algorithm_state, actor_id);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2016-12-24 20:02:25 -08:00
|
|
|
int heartbeat_handler(event_loop *loop, timer_id id, void *context) {
|
2017-03-01 01:17:24 -08:00
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
2017-02-26 00:32:43 -08:00
|
|
|
SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
|
|
|
|
LocalSchedulerInfo info;
|
2016-12-24 20:02:25 -08:00
|
|
|
/* Ask the scheduling algorithm to fill out the scheduler info struct. */
|
|
|
|
provide_scheduler_info(state, algorithm_state, &info);
|
|
|
|
/* Publish the heartbeat to all subscribers of the local scheduler table. */
|
|
|
|
local_scheduler_table_send_info(state->db, &info, NULL);
|
|
|
|
/* Reset the timer. */
|
2017-03-17 17:03:58 -07:00
|
|
|
return HEARTBEAT_TIMEOUT_MILLISECONDS;
|
2016-12-24 20:02:25 -08:00
|
|
|
}
|
|
|
|
|
2016-12-20 20:21:35 -08:00
|
|
|
void start_server(const char *node_ip_address,
|
|
|
|
const char *socket_name,
|
2016-10-18 18:27:43 -07:00
|
|
|
const char *redis_addr,
|
|
|
|
int redis_port,
|
2016-12-04 17:08:16 -08:00
|
|
|
const char *plasma_store_socket_name,
|
2016-12-12 23:17:22 -08:00
|
|
|
const char *plasma_manager_socket_name,
|
2016-12-13 17:21:38 -08:00
|
|
|
const char *plasma_manager_address,
|
2017-01-27 01:28:48 -08:00
|
|
|
bool global_scheduler_exists,
|
2017-02-10 12:46:23 -08:00
|
|
|
const double static_resource_conf[],
|
2017-02-09 01:34:14 -08:00
|
|
|
const char *start_worker_command,
|
2017-02-10 12:46:23 -08:00
|
|
|
int num_workers) {
|
2017-01-17 20:34:31 -08:00
|
|
|
/* Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
|
|
|
|
* to a client that has already died, the local scheduler could die. */
|
|
|
|
signal(SIGPIPE, SIG_IGN);
|
2016-11-02 00:09:04 -07:00
|
|
|
int fd = bind_ipc_sock(socket_name, true);
|
2016-10-04 16:25:11 -07:00
|
|
|
event_loop *loop = event_loop_create();
|
2017-02-26 00:32:43 -08:00
|
|
|
g_state = LocalSchedulerState_init(
|
2016-12-21 18:53:12 -08:00
|
|
|
node_ip_address, loop, redis_addr, redis_port, socket_name,
|
|
|
|
plasma_store_socket_name, plasma_manager_socket_name,
|
2017-02-10 12:46:23 -08:00
|
|
|
plasma_manager_address, global_scheduler_exists, static_resource_conf,
|
|
|
|
start_worker_command, num_workers);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Register a callback for registering new clients. */
|
2016-10-05 18:07:08 -07:00
|
|
|
event_loop_add_file(loop, fd, EVENT_LOOP_READ, new_client_connection,
|
|
|
|
g_state);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Subscribe to receive notifications about tasks that are assigned to this
|
2017-02-15 00:10:05 -08:00
|
|
|
* local scheduler by the global scheduler or by other local schedulers.
|
|
|
|
* TODO(rkn): we also need to get any tasks that were assigned to this local
|
|
|
|
* scheduler before the call to subscribe. */
|
2016-12-04 15:51:03 -08:00
|
|
|
if (g_state->db != NULL) {
|
|
|
|
task_table_subscribe(g_state->db, get_db_client_id(g_state->db),
|
2016-11-18 19:57:51 -08:00
|
|
|
TASK_STATUS_SCHEDULED, handle_task_scheduled_callback,
|
2017-04-10 21:02:36 -07:00
|
|
|
g_state, NULL, NULL, NULL);
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
2017-02-15 00:10:05 -08:00
|
|
|
/* Subscribe to notifications about newly created actors. */
|
|
|
|
if (g_state->db != NULL) {
|
|
|
|
actor_notification_table_subscribe(
|
2017-02-24 12:41:32 -08:00
|
|
|
g_state->db, handle_actor_creation_callback, g_state, NULL);
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
2016-12-24 20:02:25 -08:00
|
|
|
/* Create a timer for publishing information about the load on the local
|
|
|
|
* scheduler to the local scheduler table. This message also serves as a
|
|
|
|
* heartbeat. */
|
|
|
|
if (g_state->db != NULL) {
|
2017-03-17 17:03:58 -07:00
|
|
|
event_loop_add_timer(loop, HEARTBEAT_TIMEOUT_MILLISECONDS,
|
2016-12-24 20:02:25 -08:00
|
|
|
heartbeat_handler, g_state);
|
|
|
|
}
|
2017-01-23 19:44:15 -08:00
|
|
|
/* Create a timer for fetching queued tasks' missing object dependencies. */
|
|
|
|
event_loop_add_timer(loop, LOCAL_SCHEDULER_FETCH_TIMEOUT_MILLISECONDS,
|
|
|
|
fetch_object_timeout_handler, g_state);
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Run event loop. */
|
2016-10-04 12:55:10 -07:00
|
|
|
event_loop_run(loop);
|
|
|
|
}
|
|
|
|
|
2016-12-12 23:17:22 -08:00
|
|
|
/* Only declare the main function if we are not in testing mode, since the test
|
|
|
|
* suite has its own declaration of main. */
|
2017-02-27 12:24:07 -08:00
|
|
|
#ifndef LOCAL_SCHEDULER_TEST
|
2016-10-04 12:55:10 -07:00
|
|
|
int main(int argc, char *argv[]) {
|
2016-10-05 18:07:08 -07:00
|
|
|
signal(SIGTERM, signal_handler);
|
2016-10-04 12:55:10 -07:00
|
|
|
/* Path of the listening socket of the local scheduler. */
|
|
|
|
char *scheduler_socket_name = NULL;
|
|
|
|
/* IP address and port of redis. */
|
|
|
|
char *redis_addr_port = NULL;
|
2016-10-18 18:27:43 -07:00
|
|
|
/* Socket name for the local Plasma store. */
|
2016-12-04 17:08:16 -08:00
|
|
|
char *plasma_store_socket_name = NULL;
|
|
|
|
/* Socket name for the local Plasma manager. */
|
|
|
|
char *plasma_manager_socket_name = NULL;
|
2017-02-27 12:24:07 -08:00
|
|
|
/* Address for the plasma manager associated with this local scheduler
|
|
|
|
* instance. */
|
2016-12-13 17:21:38 -08:00
|
|
|
char *plasma_manager_address = NULL;
|
2016-12-20 20:21:35 -08:00
|
|
|
/* The IP address of the node that this local scheduler is running on. */
|
|
|
|
char *node_ip_address = NULL;
|
2017-02-09 01:34:14 -08:00
|
|
|
/* Comma-separated list of configured resource capabilities for this node. */
|
|
|
|
char *static_resource_list = NULL;
|
2017-03-05 02:05:02 -08:00
|
|
|
double static_resource_conf[ResourceIndex_MAX];
|
2017-02-10 12:46:23 -08:00
|
|
|
/* The command to run when starting new workers. */
|
|
|
|
char *start_worker_command = NULL;
|
|
|
|
/* The number of workers to start. */
|
|
|
|
char *num_workers_str = NULL;
|
2016-10-04 12:55:10 -07:00
|
|
|
int c;
|
2016-12-12 23:17:22 -08:00
|
|
|
bool global_scheduler_exists = true;
|
2017-02-10 12:46:23 -08:00
|
|
|
while ((c = getopt(argc, argv, "s:r:p:m:ga:h:c:w:n:")) != -1) {
|
2016-10-04 12:55:10 -07:00
|
|
|
switch (c) {
|
|
|
|
case 's':
|
|
|
|
scheduler_socket_name = optarg;
|
|
|
|
break;
|
|
|
|
case 'r':
|
|
|
|
redis_addr_port = optarg;
|
|
|
|
break;
|
2016-10-18 18:27:43 -07:00
|
|
|
case 'p':
|
2016-12-04 17:08:16 -08:00
|
|
|
plasma_store_socket_name = optarg;
|
|
|
|
break;
|
|
|
|
case 'm':
|
|
|
|
plasma_manager_socket_name = optarg;
|
2016-10-18 18:27:43 -07:00
|
|
|
break;
|
2016-12-12 23:17:22 -08:00
|
|
|
case 'g':
|
|
|
|
global_scheduler_exists = false;
|
|
|
|
break;
|
2016-12-13 17:21:38 -08:00
|
|
|
case 'a':
|
|
|
|
plasma_manager_address = optarg;
|
|
|
|
break;
|
2016-12-20 20:21:35 -08:00
|
|
|
case 'h':
|
|
|
|
node_ip_address = optarg;
|
|
|
|
break;
|
2017-02-10 12:46:23 -08:00
|
|
|
case 'c':
|
|
|
|
static_resource_list = optarg;
|
|
|
|
break;
|
2017-01-27 01:28:48 -08:00
|
|
|
case 'w':
|
|
|
|
start_worker_command = optarg;
|
|
|
|
break;
|
2017-02-10 12:46:23 -08:00
|
|
|
case 'n':
|
|
|
|
num_workers_str = optarg;
|
2017-02-09 01:34:14 -08:00
|
|
|
break;
|
2016-10-04 12:55:10 -07:00
|
|
|
default:
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("unknown option %c", c);
|
2016-10-04 12:55:10 -07:00
|
|
|
}
|
|
|
|
}
|
2017-02-09 01:34:14 -08:00
|
|
|
if (!static_resource_list) {
|
|
|
|
/* Use defaults for this node's static resource configuration. */
|
|
|
|
memset(&static_resource_conf[0], 0, sizeof(static_resource_conf));
|
2017-03-05 02:05:02 -08:00
|
|
|
static_resource_conf[ResourceIndex_CPU] = DEFAULT_NUM_CPUS;
|
|
|
|
static_resource_conf[ResourceIndex_GPU] = DEFAULT_NUM_GPUS;
|
2017-02-09 01:34:14 -08:00
|
|
|
} else {
|
|
|
|
/* Tokenize the string. */
|
|
|
|
const char delim[2] = ",";
|
|
|
|
char *token;
|
|
|
|
int idx = 0; /* Index into the resource vector. */
|
|
|
|
token = strtok(static_resource_list, delim);
|
2017-03-05 02:05:02 -08:00
|
|
|
while (token != NULL && idx < ResourceIndex_MAX) {
|
2017-02-09 01:34:14 -08:00
|
|
|
static_resource_conf[idx++] = atoi(token);
|
|
|
|
/* Attempt to get the next token. */
|
|
|
|
token = strtok(NULL, delim);
|
|
|
|
}
|
|
|
|
}
|
2016-10-04 12:55:10 -07:00
|
|
|
if (!scheduler_socket_name) {
|
2016-11-10 18:13:26 -08:00
|
|
|
LOG_FATAL("please specify socket for incoming connections with -s switch");
|
2016-10-04 12:55:10 -07:00
|
|
|
}
|
2016-12-04 17:08:16 -08:00
|
|
|
if (!plasma_store_socket_name) {
|
|
|
|
LOG_FATAL(
|
|
|
|
"please specify socket for connecting to Plasma store with -p switch");
|
2016-10-18 18:27:43 -07:00
|
|
|
}
|
2016-12-20 20:21:35 -08:00
|
|
|
if (!node_ip_address) {
|
2017-01-27 01:28:48 -08:00
|
|
|
LOG_FATAL("please specify the node IP address with -h switch");
|
2016-12-20 20:21:35 -08:00
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
int num_workers = 0;
|
|
|
|
if (num_workers_str) {
|
|
|
|
num_workers = strtol(num_workers_str, NULL, 10);
|
|
|
|
if (num_workers < 0) {
|
|
|
|
LOG_FATAL("Number of workers must be nonnegative");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
char *redis_addr = NULL;
|
|
|
|
int redis_port = -1;
|
2016-11-18 19:57:51 -08:00
|
|
|
if (!redis_addr_port) {
|
|
|
|
/* Start the local scheduler without connecting to Redis. In this case, all
|
|
|
|
* submitted tasks will be queued and scheduled locally. */
|
2016-12-04 17:08:16 -08:00
|
|
|
if (plasma_manager_socket_name) {
|
|
|
|
LOG_FATAL(
|
|
|
|
"if a plasma manager socket name is provided with the -m switch, "
|
|
|
|
"then a redis address must be provided with the -r switch");
|
|
|
|
}
|
2016-11-18 19:57:51 -08:00
|
|
|
} else {
|
2017-02-10 12:46:23 -08:00
|
|
|
char redis_addr_buffer[16] = {0};
|
|
|
|
char redis_port_str[6] = {0};
|
2016-11-18 19:57:51 -08:00
|
|
|
/* Parse the Redis address into an IP address and a port. */
|
2017-02-10 12:46:23 -08:00
|
|
|
int num_assigned = sscanf(redis_addr_port, "%15[0-9.]:%5[0-9]",
|
|
|
|
redis_addr_buffer, redis_port_str);
|
2016-11-19 12:19:49 -08:00
|
|
|
if (num_assigned != 2) {
|
2016-11-18 19:57:51 -08:00
|
|
|
LOG_FATAL(
|
|
|
|
"if a redis address is provided with the -r switch, it should be "
|
|
|
|
"formatted like 127.0.0.1:6379");
|
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
redis_addr = redis_addr_buffer;
|
|
|
|
redis_port = strtol(redis_port_str, NULL, 10);
|
|
|
|
if (redis_port == 0) {
|
|
|
|
LOG_FATAL("Unable to parse port number from redis address %s",
|
|
|
|
redis_addr_port);
|
|
|
|
}
|
2016-12-04 17:08:16 -08:00
|
|
|
if (!plasma_manager_socket_name) {
|
|
|
|
LOG_FATAL(
|
|
|
|
"please specify socket for connecting to Plasma manager with -m "
|
|
|
|
"switch");
|
|
|
|
}
|
2016-10-04 12:55:10 -07:00
|
|
|
}
|
2017-02-10 12:46:23 -08:00
|
|
|
|
|
|
|
start_server(node_ip_address, scheduler_socket_name, redis_addr, redis_port,
|
|
|
|
plasma_store_socket_name, plasma_manager_socket_name,
|
|
|
|
plasma_manager_address, global_scheduler_exists,
|
|
|
|
static_resource_conf, start_worker_command, num_workers);
|
2016-10-04 12:55:10 -07:00
|
|
|
}
|
2016-12-12 23:17:22 -08:00
|
|
|
#endif
|