This solves a problem where exports were sent to workers out of order. (#430)

* Revert queueing of exports in scheduler.

* Export each category of exports to workers in the order that they were received.

* Small fixes.
This commit is contained in:
Robert Nishihara 2016-09-14 17:18:02 -07:00 committed by Philipp Moritz
parent d5815673a5
commit 9ecf72484b
2 changed files with 59 additions and 147 deletions

View file

@ -279,7 +279,7 @@ Status SchedulerService::RegisterWorker(ServerContext* context, const RegisterWo
(*workers)[workerid].objstoreid = objstoreid;
(*workers)[workerid].worker_stub = WorkerService::NewStub(channel);
(*workers)[workerid].worker_address = worker_address;
(*workers)[workerid].initialized = false;
(*workers)[workerid].initial_exports_done = false;
if (is_driver) {
(*workers)[workerid].current_task = ROOT_OPERATION; // We use this field to identify which workers are drivers.
} else {
@ -384,16 +384,10 @@ Status SchedulerService::ReadyForNewTask(ServerContext* context, const ReadyForN
{
// Check if the worker has been initialized yet, and if not, then give it
// all of the exported functions and all of the exported reusable variables.
if (!(*workers)[workerid].initialized) {
// This should only happen once.
// Queue up all functions to run on the worker.
add_all_functions_to_run_to_worker_queue(workerid);
// Queue up all remote functions to be imported on the worker.
add_all_remote_functions_to_worker_export_queue(workerid);
// Queue up all reusable variables to be imported on the worker.
add_all_reusable_variables_to_worker_export_queue(workerid);
// Mark the worker as initialized.
(*workers)[workerid].initialized = true;
if (!(*workers)[workerid].initial_exports_done) {
// This only needs to happen for this specific worker and not for all
// workers.
export_everything_to_all_workers_if_necessary(workers);
}
}
(*workers)[workerid].current_task = NO_OPERATION; // clear operation ID
@ -543,54 +537,44 @@ Status SchedulerService::KillWorkers(ServerContext* context, const KillWorkersRe
}
Status SchedulerService::RunFunctionOnAllWorkers(ServerContext* context, const RunFunctionOnAllWorkersRequest* request, AckReply* reply) {
{
auto workers = GET(workers_);
auto function_to_run_queue = GET(function_to_run_queue_);
auto exported_functions_to_run = GET(exported_functions_to_run_);
exported_functions_to_run->push_back(std::unique_ptr<Function>(new Function(request->function())));
for (WorkerId workerid = 0; workerid < workers->size(); ++workerid) {
if ((*workers)[workerid].current_task != ROOT_OPERATION) {
function_to_run_queue->push(std::make_pair(workerid, exported_functions_to_run->size() - 1));
}
auto workers = GET(workers_);
export_everything_to_all_workers_if_necessary(workers);
auto exported_functions_to_run = GET(exported_functions_to_run_);
// TODO(rkn): Does this do a deep copy?
exported_functions_to_run->push_back(std::unique_ptr<Function>(new Function(request->function())));
for (size_t i = 0; i < workers->size(); ++i) {
if ((*workers)[i].current_task != ROOT_OPERATION) {
export_function_to_run_to_worker(i, exported_functions_to_run->size() - 1, workers, exported_functions_to_run);
}
}
schedule();
return Status::OK;
}
Status SchedulerService::ExportRemoteFunction(ServerContext* context, const ExportRemoteFunctionRequest* request, AckReply* reply) {
{
auto workers = GET(workers_);
auto remote_function_export_queue = GET(remote_function_export_queue_);
auto exported_functions = GET(exported_functions_);
// TODO(rkn): Does this do a deep copy?
exported_functions->push_back(std::unique_ptr<Function>(new Function(request->function())));
for (WorkerId workerid = 0; workerid < workers->size(); ++workerid) {
if ((*workers)[workerid].current_task != ROOT_OPERATION) {
// Add this workerid and remote function pair to the export queue.
remote_function_export_queue->push(std::make_pair(workerid, exported_functions->size() - 1));
}
auto workers = GET(workers_);
export_everything_to_all_workers_if_necessary(workers);
auto exported_remote_functions = GET(exported_remote_functions_);
// TODO(rkn): Does this do a deep copy?
exported_remote_functions->push_back(std::unique_ptr<Function>(new Function(request->function())));
for (size_t i = 0; i < workers->size(); ++i) {
if ((*workers)[i].current_task != ROOT_OPERATION) {
export_remote_function_to_worker(i, exported_remote_functions->size() - 1, workers, exported_remote_functions);
}
}
schedule();
return Status::OK;
}
Status SchedulerService::ExportReusableVariable(ServerContext* context, const ExportReusableVariableRequest* request, AckReply* reply) {
{
auto workers = GET(workers_);
auto reusable_variable_export_queue = GET(reusable_variable_export_queue_);
auto exported_reusable_variables = GET(exported_reusable_variables_);
// TODO(rkn): Does this do a deep copy?
exported_reusable_variables->push_back(std::unique_ptr<ReusableVar>(new ReusableVar(request->reusable_variable())));
for (WorkerId workerid = 0; workerid < workers->size(); ++workerid) {
if ((*workers)[workerid].current_task != ROOT_OPERATION) {
// Add this workerid and reusable variable pair to the export queue.
reusable_variable_export_queue->push(std::make_pair(workerid, exported_reusable_variables->size() - 1));
}
auto workers = GET(workers_);
export_everything_to_all_workers_if_necessary(workers);
auto exported_reusable_variables = GET(exported_reusable_variables_);
// TODO(rkn): Does this do a deep copy?
exported_reusable_variables->push_back(std::unique_ptr<ReusableVar>(new ReusableVar(request->reusable_variable())));
for (size_t i = 0; i < workers->size(); ++i) {
if ((*workers)[i].current_task != ROOT_OPERATION) {
export_reusable_variable_to_worker(i, exported_reusable_variables->size() - 1, workers, exported_reusable_variables);
}
}
schedule();
return Status::OK;
}
@ -654,18 +638,6 @@ void SchedulerService::deliver_object_async(ObjectID canonical_objectid, ObjStor
}
void SchedulerService::schedule() {
// Run functions on workers. This must happen before we schedule tasks in
// order to guarantee that remote function calls use the most up to date
// environment.
perform_functions_to_run();
// Export remote functions to the workers. This must happen before we schedule
// tasks in order to guarantee that remote function calls use the most up to
// date definitions.
perform_remote_function_exports();
// Export reusable variables to the workers. This must happen before we
// schedule tasks in order to guarantee that the workers have the definitions
// they need.
perform_reusable_variable_exports();
// See what we can do in get_queue_
perform_gets();
if (scheduling_algorithm_ == SCHEDULING_ALGORITHM_NAIVE) {
@ -850,39 +822,6 @@ bool SchedulerService::is_canonical(ObjectID objectid) {
return objectid == (*target_objectids)[objectid];
}
void SchedulerService::perform_functions_to_run() {
auto workers = GET(workers_);
auto function_to_run_queue = GET(function_to_run_queue_);
auto exported_functions_to_run = GET(exported_functions_to_run_);
while (!function_to_run_queue->empty()) {
std::pair<WorkerId, int> workerid_functionid_pair = function_to_run_queue->front();
export_function_to_run_to_worker(workerid_functionid_pair.first, workerid_functionid_pair.second, workers, exported_functions_to_run);
function_to_run_queue->pop();
}
}
void SchedulerService::perform_remote_function_exports() {
auto workers = GET(workers_);
auto remote_function_export_queue = GET(remote_function_export_queue_);
auto exported_functions = GET(exported_functions_);
while (!remote_function_export_queue->empty()) {
std::pair<WorkerId, int> workerid_functionid_pair = remote_function_export_queue->front();
export_function_to_worker(workerid_functionid_pair.first, workerid_functionid_pair.second, workers, exported_functions);
remote_function_export_queue->pop();
}
}
void SchedulerService::perform_reusable_variable_exports() {
auto workers = GET(workers_);
auto reusable_variable_export_queue = GET(reusable_variable_export_queue_);
auto exported_reusable_variables = GET(exported_reusable_variables_);
while (!reusable_variable_export_queue->empty()) {
std::pair<WorkerId, int> workerid_variableid_pair = reusable_variable_export_queue->front();
export_reusable_variable_to_worker(workerid_variableid_pair.first, workerid_variableid_pair.second, workers, exported_reusable_variables);
reusable_variable_export_queue->pop();
}
}
void SchedulerService::perform_gets() {
auto get_queue = GET(get_queue_);
// Complete all get tasks that can be completed.
@ -1156,11 +1095,11 @@ void SchedulerService::export_function_to_run_to_worker(WorkerId workerid, int f
RAY_CHECK_GRPC((*workers)[workerid].worker_stub->RunFunctionOnWorker(&context, request, &reply));
}
void SchedulerService::export_function_to_worker(WorkerId workerid, int function_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<Function> > > &exported_functions) {
void SchedulerService::export_remote_function_to_worker(WorkerId workerid, int function_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<Function> > > &exported_remote_functions) {
RAY_LOG(RAY_INFO, "exporting remote function with index " << function_index << " to worker " << workerid);
ClientContext context;
ImportRemoteFunctionRequest request;
request.mutable_function()->CopyFrom(*(*exported_functions)[function_index].get());
request.mutable_function()->CopyFrom(*(*exported_remote_functions)[function_index].get());
AckReply reply;
RAY_CHECK_GRPC((*workers)[workerid].worker_stub->ImportRemoteFunction(&context, request, &reply));
}
@ -1174,27 +1113,28 @@ void SchedulerService::export_reusable_variable_to_worker(WorkerId workerid, int
RAY_CHECK_GRPC((*workers)[workerid].worker_stub->ImportReusableVariable(&context, request, &reply));
}
void SchedulerService::add_all_functions_to_run_to_worker_queue(WorkerId workerid) {
auto function_to_run_queue = GET(function_to_run_queue_);
void SchedulerService::export_everything_to_all_workers_if_necessary(MySynchronizedPtr<std::vector<WorkerHandle> > &workers) {
auto exported_functions_to_run = GET(exported_functions_to_run_);
for (int i = 0; i < exported_functions_to_run->size(); ++i) {
function_to_run_queue->push(std::make_pair(workerid, i));
}
}
void SchedulerService::add_all_remote_functions_to_worker_export_queue(WorkerId workerid) {
auto remote_function_export_queue = GET(remote_function_export_queue_);
auto exported_functions = GET(exported_functions_);
for (int i = 0; i < exported_functions->size(); ++i) {
remote_function_export_queue->push(std::make_pair(workerid, i));
}
}
void SchedulerService::add_all_reusable_variables_to_worker_export_queue(WorkerId workerid) {
auto reusable_variable_export_queue = GET(reusable_variable_export_queue_);
auto exported_remote_functions = GET(exported_remote_functions_);
auto exported_reusable_variables = GET(exported_reusable_variables_);
for (int i = 0; i < exported_reusable_variables->size(); ++i) {
reusable_variable_export_queue->push(std::make_pair(workerid, i));
for (size_t workerid = 0; workerid < workers->size(); ++workerid) {
if ((*workers)[workerid].current_task != ROOT_OPERATION && !(*workers)[workerid].initial_exports_done) {
// Export the functions to run to the worker.
for (int i = 0; i < exported_functions_to_run->size(); ++i) {
export_function_to_run_to_worker(workerid, i, workers, exported_functions_to_run);
}
// Export the remote functions to the worker.
for (int i = 0; i < exported_remote_functions->size(); ++i) {
export_remote_function_to_worker(workerid, i, workers, exported_remote_functions);
}
// Export the reusable variables to the worker.
for (int i = 0; i < exported_reusable_variables->size(); ++i) {
export_reusable_variable_to_worker(workerid, i, workers, exported_reusable_variables);
}
// Record that we have done this so we do not need to do it again for this
// worker.
(*workers)[workerid].initial_exports_done = true;
}
}
}

View file

@ -2,7 +2,6 @@
#define RAY_SCHEDULER_H
#include <queue>
#include <deque>
#include <memory>
#include <algorithm>
@ -39,9 +38,8 @@ struct WorkerHandle {
ObjStoreId objstoreid;
std::string worker_address;
// This field is initialized to false, and it is set to true after all of the
// exported functions and exported reusable variables have been shipped to
// this worker.
bool initialized;
// initial exports have been shipped to this worker.
bool initial_exports_done;
OperationId current_task;
};
@ -120,12 +118,6 @@ private:
ObjStoreId pick_objstore(ObjectID objectid);
// checks if objectid is a canonical objectid
bool is_canonical(ObjectID objectid);
// Export all queued up functions to run.
void perform_functions_to_run();
// Export all queued up remote functions.
void perform_remote_function_exports();
// Export all queued up reusable variables.
void perform_reusable_variable_exports();
// Perform all queued up gets that can be performed.
void perform_gets();
// schedule tasks using the naive algorithm
@ -155,18 +147,13 @@ private:
// Export a function to run to a worker.
void export_function_to_run_to_worker(WorkerId workerid, int function_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<Function> > > &exported_functions_to_run);
// Export a remote function to a worker.
void export_function_to_worker(WorkerId workerid, int function_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<Function> > > &exported_functions);
void export_remote_function_to_worker(WorkerId workerid, int function_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<Function> > > &exported_remote_functions);
// Export a reusable variable to a worker
void export_reusable_variable_to_worker(WorkerId workerid, int reusable_variable_index, MySynchronizedPtr<std::vector<WorkerHandle> > &workers, const MySynchronizedPtr<std::vector<std::unique_ptr<ReusableVar> > > &exported_reusable_variables);
// Add to the function to run export queue the job of exporting all functions
// to run to the given worker. This is used when a new worker registers.
void add_all_functions_to_run_to_worker_queue(WorkerId workerid);
// Add to the remote function export queue the job of exporting all remote
// functions to the given worker. This is used when a new worker registers.
void add_all_remote_functions_to_worker_export_queue(WorkerId workerid);
// Add to the reusable variable export queue the job of exporting all reusable
// variables to the given worker. This is used when a new worker registers.
void add_all_reusable_variables_to_worker_export_queue(WorkerId workerid);
// Export all exports to all workers that need them. This happens the first
// time any export would be exported to a worker or when a worker first calls
// ReadyForNewTask.
void export_everything_to_all_workers_if_necessary(MySynchronizedPtr<std::vector<WorkerHandle> > &workers);
template<class T>
MySynchronizedPtr<T> get(Synchronized<T>& my_field, const char* name,unsigned int line_number);
@ -237,25 +224,10 @@ private:
// lock (objects_lock_). // TODO(rkn): Consider making this part of the
// objtable data structure.
std::vector<std::vector<ObjectID> > objects_in_transit_;
// List of pending functions to run on workers. These should be processed in a
// first in first out manner. The first element of each pair is the ID of the
// worker to run the function on, and the second element of each pair is the
// index of the function to run.
Synchronized<std::queue<std::pair<WorkerId, int> > > function_to_run_queue_;
// List of pending remote function exports. These should be processed in a
// first in first out manner. The first element of each pair is the ID of the
// worker to export the remote function to, and the second element of each
// pair is the index of the function to export.
Synchronized<std::queue<std::pair<WorkerId, int> > > remote_function_export_queue_;
// List of pending reusable variable exports. These should be processed in a
// first in first out manner. The first element of each pair is the ID of the
// worker to export the reusable variable to, and the second element of each
// pair is the index of the reusable variable to export.
Synchronized<std::queue<std::pair<WorkerId, int> > > reusable_variable_export_queue_;
// All of the functions that have been exported to the workers to run.
Synchronized<std::vector<std::unique_ptr<Function> > > exported_functions_to_run_;
// All of the remote functions that have been exported to the workers.
Synchronized<std::vector<std::unique_ptr<Function> > > exported_functions_;
Synchronized<std::vector<std::unique_ptr<Function> > > exported_remote_functions_;
// All of the reusable variables that have been exported to the workers.
Synchronized<std::vector<std::unique_ptr<ReusableVar> > > exported_reusable_variables_;
// the scheduling algorithm that will be used