Merge pull request #181 from amplab/test

replace locks with synchronized data structures
This commit is contained in:
mehrdadn 2016-06-30 01:16:51 +03:00 committed by GitHub
commit cdc286ed70
3 changed files with 245 additions and 294 deletions

View file

@ -12,9 +12,9 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ
std::unique_ptr<Task> task(new Task(request->task())); // need to copy, because request is const std::unique_ptr<Task> task(new Task(request->task())); // need to copy, because request is const
size_t num_return_vals; size_t num_return_vals;
{ {
std::lock_guard<std::mutex> fntable_lock(fntable_lock_); auto fntable = fntable_.get();
FnTable::const_iterator fn = fntable_.find(task->name()); FnTable::const_iterator fn = fntable->find(task->name());
if (fn == fntable_.end()) { if (fn == fntable->end()) {
num_return_vals = 0; num_return_vals = 0;
reply->set_function_registered(false); reply->set_function_registered(false);
} else { } else {
@ -31,27 +31,17 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ
result_objrefs.push_back(result); result_objrefs.push_back(result);
} }
{ {
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock because increment_ref_count assumes it has been acquired
increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in raylib. increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before we reply to the worker that called SubmitTask. The corresponding decrement will happen in submit_task in raylib.
increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in raylib. increment_ref_count(result_objrefs); // We increment once so the objrefs don't go out of scope before the task is scheduled on the worker. The corresponding decrement will happen in deserialize_task in raylib.
} }
auto operation = std::unique_ptr<Operation>(new Operation()); auto operation = std::unique_ptr<Operation>(new Operation());
operation->set_allocated_task(task.release()); operation->set_allocated_task(task.release());
{ operation->set_creator_operationid((*workers_.get())[request->workerid()].current_task);
std::lock_guard<std::mutex> workers_lock(workers_lock_);
operation->set_creator_operationid(workers_[request->workerid()].current_task);
}
OperationId operationid; OperationId operationid = computation_graph_.get()->add_operation(std::move(operation));
{ task_queue_.get()->push_back(operationid);
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_);
operationid = computation_graph_.add_operation(std::move(operation));
}
{
std::lock_guard<std::mutex> task_queue_lock(task_queue_lock_);
task_queue_.push_back(operationid);
}
schedule(); schedule();
} }
return Status::OK; return Status::OK;
@ -66,17 +56,10 @@ Status SchedulerService::PutObj(ServerContext* context, const PutObjRequest* req
} }
Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) { Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) {
size_t size; size_t size = objtable_.get()->size();
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
size = objtable_.size();
}
ObjRef objref = request->objref(); ObjRef objref = request->objref();
RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists"); RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists");
{ get_queue_.get()->push_back(std::make_pair(request->workerid(), objref));
std::lock_guard<std::mutex> get_queue_lock(get_queue_lock_);
get_queue_.push_back(std::make_pair(request->workerid(), objref));
}
schedule(); schedule();
return Status::OK; return Status::OK;
} }
@ -86,25 +69,19 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs
ObjRef target_objref = request->target_objref(); ObjRef target_objref = request->target_objref();
RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref);
RAY_CHECK_NEQ(alias_objref, target_objref, "internal error: attempting to alias objref " << alias_objref << " with itself."); RAY_CHECK_NEQ(alias_objref, target_objref, "internal error: attempting to alias objref " << alias_objref << " with itself.");
size_t size; size_t size = objtable_.get()->size();
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
size = objtable_.size();
}
RAY_CHECK_LT(alias_objref, size, "internal error: no object with objref " << alias_objref << " exists"); RAY_CHECK_LT(alias_objref, size, "internal error: no object with objref " << alias_objref << " exists");
RAY_CHECK_LT(target_objref, size, "internal error: no object with objref " << target_objref << " exists"); RAY_CHECK_LT(target_objref, size, "internal error: no object with objref " << target_objref << " exists");
{ {
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
RAY_CHECK_EQ(target_objrefs_[alias_objref], UNITIALIZED_ALIAS, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); RAY_CHECK_EQ((*target_objrefs)[alias_objref], UNITIALIZED_ALIAS, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << (*target_objrefs)[alias_objref]);
target_objrefs_[alias_objref] = target_objref; (*target_objrefs)[alias_objref] = target_objref;
}
{
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_);
reverse_target_objrefs_[target_objref].push_back(alias_objref);
} }
(*reverse_target_objrefs_.get())[target_objref].push_back(alias_objref);
{ {
// The corresponding increment was done in register_new_object. // The corresponding increment was done in register_new_object.
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because decrement_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock because decrement_ref_count assumes it has been acquired
auto contained_objrefs = contained_objrefs_.get(); // we grab this lock because decrement_ref_count assumes it has been acquired
decrement_ref_count(std::vector<ObjRef>({alias_objref})); decrement_ref_count(std::vector<ObjRef>({alias_objref}));
} }
schedule(); schedule();
@ -112,14 +89,14 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs
} }
Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) {
std::lock_guard<std::mutex> objects_lock(objects_lock_); // to protect objects_in_transit_ auto objtable = objtable_.get(); // to protect objects_in_transit_
std::lock_guard<std::mutex> objstore_lock(objstores_lock_); auto objstores = objstores_.get();
ObjStoreId objstoreid = objstores_.size(); ObjStoreId objstoreid = objstores->size();
auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials()); auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials());
objstores_.push_back(ObjStoreHandle()); objstores->push_back(ObjStoreHandle());
objstores_[objstoreid].address = request->objstore_address(); (*objstores)[objstoreid].address = request->objstore_address();
objstores_[objstoreid].channel = channel; (*objstores)[objstoreid].channel = channel;
objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel); (*objstores)[objstoreid].objstore_stub = ObjStore::NewStub(channel);
reply->set_objstoreid(objstoreid); reply->set_objstoreid(objstoreid);
objects_in_transit_.push_back(std::vector<ObjRef>()); objects_in_transit_.push_back(std::vector<ObjRef>());
return Status::OK; return Status::OK;
@ -153,7 +130,8 @@ Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest*
// the corresponding increment was done in register_new_object in the // the corresponding increment was done in register_new_object in the
// scheduler. For all subsequent calls to ObjReady, the corresponding // scheduler. For all subsequent calls to ObjReady, the corresponding
// increment was done in deliver_object_if_necessary in the scheduler. // increment was done in deliver_object_if_necessary in the scheduler.
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because decrement_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock because decrement_ref_count assumes it has been acquired
auto contained_objrefs = contained_objrefs_.get(); // we grab this lock because decrement_ref_count assumes it has been acquired
decrement_ref_count(std::vector<ObjRef>({objref})); decrement_ref_count(std::vector<ObjRef>({objref}));
} }
schedule(); schedule();
@ -164,39 +142,28 @@ Status SchedulerService::ReadyForNewTask(ServerContext* context, const ReadyForN
RAY_LOG(RAY_INFO, "worker " << request->workerid() << " is ready for a new task"); RAY_LOG(RAY_INFO, "worker " << request->workerid() << " is ready for a new task");
if (request->has_previous_task_info()) { if (request->has_previous_task_info()) {
OperationId operationid; OperationId operationid;
{ operationid = (*workers_.get())[request->workerid()].current_task;
std::lock_guard<std::mutex> workers_lock(workers_lock_);
operationid = workers_[request->workerid()].current_task;
}
std::string task_name; std::string task_name;
{ task_name = computation_graph_.get()->get_task(operationid).name();
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_);
task_name = computation_graph_.get_task(operationid).name();
}
TaskStatus info; TaskStatus info;
{ {
std::lock_guard<std::mutex> workers_lock(workers_lock_); auto workers = workers_.get();
operationid = workers_[request->workerid()].current_task; operationid = (*workers)[request->workerid()].current_task;
info.set_operationid(operationid); info.set_operationid(operationid);
info.set_function_name(task_name); info.set_function_name(task_name);
info.set_worker_address(workers_[request->workerid()].worker_address); info.set_worker_address((*workers)[request->workerid()].worker_address);
info.set_error_message(request->previous_task_info().error_message()); info.set_error_message(request->previous_task_info().error_message());
workers_[request->workerid()].current_task = NO_OPERATION; // clear operation ID (*workers)[request->workerid()].current_task = NO_OPERATION; // clear operation ID
} }
if (!request->previous_task_info().task_succeeded()) { if (!request->previous_task_info().task_succeeded()) {
RAY_LOG(RAY_INFO, "Error: Task " << info.operationid() << " executing function " << info.function_name() << " on worker " << request->workerid() << " failed with error message: " << info.error_message()); RAY_LOG(RAY_INFO, "Error: Task " << info.operationid() << " executing function " << info.function_name() << " on worker " << request->workerid() << " failed with error message: " << info.error_message());
std::lock_guard<std::mutex> failed_tasks_lock(failed_tasks_lock_); failed_tasks_.get()->push_back(info);
failed_tasks_.push_back(info);
} else { } else {
std::lock_guard<std::mutex> successful_tasks_lock(successful_tasks_lock_); successful_tasks_.get()->push_back(info.operationid());
successful_tasks_.push_back(info.operationid());
} }
// TODO(rkn): Handle task failure // TODO(rkn): Handle task failure
} }
{ avail_workers_.get()->push_back(request->workerid());
std::lock_guard<std::mutex> lock(avail_workers_lock_);
avail_workers_.push_back(request->workerid());
}
schedule(); schedule();
return Status::OK; return Status::OK;
} }
@ -208,7 +175,7 @@ Status SchedulerService::IncrementRefCount(ServerContext* context, const Increme
for (int i = 0; i < num_objrefs; ++i) { for (int i = 0; i < num_objrefs; ++i) {
objrefs.push_back(request->objref(i)); objrefs.push_back(request->objref(i));
} }
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock because increment_ref_count assumes it has been acquired
increment_ref_count(objrefs); increment_ref_count(objrefs);
return Status::OK; return Status::OK;
} }
@ -220,7 +187,8 @@ Status SchedulerService::DecrementRefCount(ServerContext* context, const Decreme
for (int i = 0; i < num_objrefs; ++i) { for (int i = 0; i < num_objrefs; ++i) {
objrefs.push_back(request->objref(i)); objrefs.push_back(request->objref(i));
} }
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock, because decrement_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock, because decrement_ref_count assumes it has been acquired
auto contained_objrefs = contained_objrefs_.get(); // we grab this lock because decrement_ref_count assumes it has been acquired
decrement_ref_count(objrefs); decrement_ref_count(objrefs);
return Status::OK; return Status::OK;
} }
@ -231,10 +199,10 @@ Status SchedulerService::AddContainedObjRefs(ServerContext* context, const AddCo
// TODO(rkn): Perhaps we don't need this check. It won't work because the objstore may not have called ObjReady yet. // TODO(rkn): Perhaps we don't need this check. It won't work because the objstore may not have called ObjReady yet.
// RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref); // RAY_LOG(RAY_FATAL, "Attempting to add contained objrefs for non-canonical objref " << objref);
// } // }
std::lock_guard<std::mutex> contained_objrefs_lock(contained_objrefs_lock_); auto contained_objrefs = contained_objrefs_.get();
RAY_CHECK_EQ(contained_objrefs_[objref].size(), 0, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0."); RAY_CHECK_EQ((*contained_objrefs)[objref].size(), 0, "Attempting to add contained objrefs for objref " << objref << ", but contained_objrefs_[objref].size() != 0.");
for (int i = 0; i < request->contained_objref_size(); ++i) { for (int i = 0; i < request->contained_objref_size(); ++i) {
contained_objrefs_[objref].push_back(request->contained_objref(i)); (*contained_objrefs)[objref].push_back(request->contained_objref(i));
} }
return Status::OK; return Status::OK;
} }
@ -245,34 +213,34 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn
} }
Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) { Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) {
std::lock_guard<std::mutex> successful_tasks_lock(successful_tasks_lock_); auto successful_tasks = successful_tasks_.get();
std::lock_guard<std::mutex> failed_tasks_lock(failed_tasks_lock_); auto failed_tasks = failed_tasks_.get();
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_); auto computation_graph = computation_graph_.get();
std::lock_guard<std::mutex> workers_lock(workers_lock_); auto workers = workers_.get();
for (int i = 0; i < failed_tasks_.size(); ++i) { for (int i = 0; i < failed_tasks->size(); ++i) {
TaskStatus* info = reply->add_failed_task(); TaskStatus* info = reply->add_failed_task();
*info = failed_tasks_[i]; *info = (*failed_tasks)[i];
} }
for (int i = 0; i < workers_.size(); ++i) { for (int i = 0; i < workers->size(); ++i) {
OperationId operationid = workers_[i].current_task; OperationId operationid = (*workers)[i].current_task;
if (operationid != NO_OPERATION) { if (operationid != NO_OPERATION) {
const Task& task = computation_graph_.get_task(operationid); const Task& task = computation_graph->get_task(operationid);
TaskStatus* info = reply->add_running_task(); TaskStatus* info = reply->add_running_task();
info->set_operationid(operationid); info->set_operationid(operationid);
info->set_function_name(task.name()); info->set_function_name(task.name());
info->set_worker_address(workers_[i].worker_address); info->set_worker_address((*workers)[i].worker_address);
} }
} }
reply->set_num_succeeded(successful_tasks_.size()); reply->set_num_succeeded(successful_tasks->size());
return Status::OK; return Status::OK;
} }
void SchedulerService::deliver_object_if_necessary(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) { void SchedulerService::deliver_object_if_necessary(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) {
bool object_present_or_in_transit; bool object_present_or_in_transit;
{ {
std::lock_guard<std::mutex> objects_lock(objects_lock_); auto objtable = objtable_.get();
auto &objstores = objtable_[canonical_objref]; auto &locations = (*objtable)[canonical_objref];
bool object_present = std::binary_search(objstores.begin(), objstores.end(), to); bool object_present = std::binary_search(locations.begin(), locations.end(), to);
auto &objects_in_flight = objects_in_transit_[to]; auto &objects_in_flight = objects_in_transit_[to];
bool object_in_transit = (std::find(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref) != objects_in_flight.end()); bool object_in_transit = (std::find(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref) != objects_in_flight.end());
object_present_or_in_transit = object_present || object_in_transit; object_present_or_in_transit = object_present || object_in_transit;
@ -299,16 +267,16 @@ void SchedulerService::deliver_object(ObjRef canonical_objref, ObjStoreId from,
// We increment once so the objref doesn't go out of scope before the ObjReady // We increment once so the objref doesn't go out of scope before the ObjReady
// method is called. The corresponding decrement will happen in ObjReady in // method is called. The corresponding decrement will happen in ObjReady in
// the scheduler. // the scheduler.
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired auto reference_counts = reference_counts_.get(); // we grab this lock because increment_ref_count assumes it has been acquired
increment_ref_count(std::vector<ObjRef>({canonical_objref})); increment_ref_count(std::vector<ObjRef>({canonical_objref}));
} }
ClientContext context; ClientContext context;
AckReply reply; AckReply reply;
StartDeliveryRequest request; StartDeliveryRequest request;
request.set_objref(canonical_objref); request.set_objref(canonical_objref);
std::lock_guard<std::mutex> lock(objstores_lock_); auto objstores = objstores_.get();
request.set_objstore_address(objstores_[from].address); request.set_objstore_address((*objstores)[from].address);
objstores_[to].objstore_stub->StartDelivery(&context, request, &reply); (*objstores)[to].objstore_stub->StartDelivery(&context, request, &reply);
} }
void SchedulerService::schedule() { void SchedulerService::schedule() {
@ -328,7 +296,7 @@ void SchedulerService::schedule() {
// assign_task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments // assign_task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments
void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) {
ObjStoreId objstoreid = get_store(workerid); ObjStoreId objstoreid = get_store(workerid);
const Task& task = computation_graph_.get_task(operationid); const Task& task = computation_graph_.unsafe_get()->get_task(operationid);
ClientContext context; ClientContext context;
ExecuteTaskRequest request; ExecuteTaskRequest request;
ExecuteTaskReply reply; ExecuteTaskReply reply;
@ -337,26 +305,23 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) {
if (!task.arg(i).has_obj()) { if (!task.arg(i).has_obj()) {
ObjRef objref = task.arg(i).ref(); ObjRef objref = task.arg(i).ref();
ObjRef canonical_objref = get_canonical_objref(objref); ObjRef canonical_objref = get_canonical_objref(objref);
{ // Notify the relevant objstore about potential aliasing when it's ready
// Notify the relevant objstore about potential aliasing when it's ready alias_notification_queue_.get()->push_back(std::make_pair(objstoreid, std::make_pair(objref, canonical_objref)));
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
alias_notification_queue_.push_back(std::make_pair(objstoreid, std::make_pair(objref, canonical_objref)));
}
attempt_notify_alias(objstoreid, objref, canonical_objref); attempt_notify_alias(objstoreid, objref, canonical_objref);
RAY_LOG(RAY_DEBUG, "task contains object ref " << canonical_objref); RAY_LOG(RAY_DEBUG, "task contains object ref " << canonical_objref);
deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid); deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid);
} }
} }
{ {
std::lock_guard<std::mutex> workers_lock(workers_lock_); auto workers = workers_.get();
workers_[workerid].current_task = operationid; (*workers)[workerid].current_task = operationid;
request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here? request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here?
Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply); Status status = (*workers)[workerid].worker_stub->ExecuteTask(&context, request, &reply);
} }
} }
bool SchedulerService::can_run(const Task& task) { bool SchedulerService::can_run(const Task& task) {
std::lock_guard<std::mutex> lock(objects_lock_); auto objtable = objtable_.get();
for (int i = 0; i < task.arg_size(); ++i) { for (int i = 0; i < task.arg_size(); ++i) {
if (!task.arg(i).has_obj()) { if (!task.arg(i).has_obj()) {
ObjRef objref = task.arg(i).ref(); ObjRef objref = task.arg(i).ref();
@ -364,7 +329,7 @@ bool SchedulerService::can_run(const Task& task) {
return false; return false;
} }
ObjRef canonical_objref = get_canonical_objref(objref); ObjRef canonical_objref = get_canonical_objref(objref);
if (canonical_objref >= objtable_.size() || objtable_[canonical_objref].size() == 0) { if (canonical_objref >= objtable->size() || (*objtable)[canonical_objref].size() == 0) {
return false; return false;
} }
} }
@ -377,9 +342,9 @@ std::pair<WorkerId, ObjStoreId> SchedulerService::register_worker(const std::str
ObjStoreId objstoreid = std::numeric_limits<size_t>::max(); ObjStoreId objstoreid = std::numeric_limits<size_t>::max();
// TODO: HACK: num_attempts is a hack // TODO: HACK: num_attempts is a hack
for (int num_attempts = 0; num_attempts < 5; ++num_attempts) { for (int num_attempts = 0; num_attempts < 5; ++num_attempts) {
std::lock_guard<std::mutex> lock(objstores_lock_); auto objstores = objstores_.get();
for (size_t i = 0; i < objstores_.size(); ++i) { for (size_t i = 0; i < objstores->size(); ++i) {
if (objstores_[i].address == objstore_address) { if ((*objstores)[i].address == objstore_address) {
objstoreid = i; objstoreid = i;
} }
} }
@ -390,15 +355,15 @@ std::pair<WorkerId, ObjStoreId> SchedulerService::register_worker(const std::str
RAY_CHECK_NEQ(objstoreid, std::numeric_limits<size_t>::max(), "object store with address " << objstore_address << " not yet registered"); RAY_CHECK_NEQ(objstoreid, std::numeric_limits<size_t>::max(), "object store with address " << objstore_address << " not yet registered");
WorkerId workerid; WorkerId workerid;
{ {
std::lock_guard<std::mutex> workers_lock(workers_lock_); auto workers = workers_.get();
workerid = workers_.size(); workerid = workers->size();
workers_.push_back(WorkerHandle()); workers->push_back(WorkerHandle());
auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials()); auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials());
workers_[workerid].channel = channel; (*workers)[workerid].channel = channel;
workers_[workerid].objstoreid = objstoreid; (*workers)[workerid].objstoreid = objstoreid;
workers_[workerid].worker_stub = WorkerService::NewStub(channel); (*workers)[workerid].worker_stub = WorkerService::NewStub(channel);
workers_[workerid].worker_address = worker_address; (*workers)[workerid].worker_address = worker_address;
workers_[workerid].current_task = NO_OPERATION; (*workers)[workerid].current_task = NO_OPERATION;
} }
return std::make_pair(workerid, objstoreid); return std::make_pair(workerid, objstoreid);
} }
@ -406,25 +371,25 @@ std::pair<WorkerId, ObjStoreId> SchedulerService::register_worker(const std::str
ObjRef SchedulerService::register_new_object() { ObjRef SchedulerService::register_new_object() {
// If we don't simultaneously lock objtable_ and target_objrefs_, we will probably get errors. // If we don't simultaneously lock objtable_ and target_objrefs_, we will probably get errors.
// TODO(rkn): increment/decrement_reference_count also acquire reference_counts_lock_ and target_objrefs_lock_ (through has_canonical_objref()), which caused deadlock in the past // TODO(rkn): increment/decrement_reference_count also acquire reference_counts_lock_ and target_objrefs_lock_ (through has_canonical_objref()), which caused deadlock in the past
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); auto reference_counts = reference_counts_.get();
std::lock_guard<std::mutex> contained_objrefs_lock(contained_objrefs_lock_); auto contained_objrefs = contained_objrefs_.get();
std::lock_guard<std::mutex> objects_lock(objects_lock_); auto objtable = objtable_.get();
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_); auto reverse_target_objrefs = reverse_target_objrefs_.get();
ObjRef objtable_size = objtable_.size(); ObjRef objtable_size = objtable->size();
ObjRef target_objrefs_size = target_objrefs_.size(); ObjRef target_objrefs_size = target_objrefs->size();
ObjRef reverse_target_objrefs_size = reverse_target_objrefs_.size(); ObjRef reverse_target_objrefs_size = reverse_target_objrefs->size();
ObjRef reference_counts_size = reference_counts_.size(); ObjRef reference_counts_size = reference_counts->size();
ObjRef contained_objrefs_size = contained_objrefs_.size(); ObjRef contained_objrefs_size = contained_objrefs->size();
RAY_CHECK_EQ(objtable_size, target_objrefs_size, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); RAY_CHECK_EQ(objtable_size, target_objrefs_size, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size);
RAY_CHECK_EQ(objtable_size, reverse_target_objrefs_size, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size); RAY_CHECK_EQ(objtable_size, reverse_target_objrefs_size, "objtable_ and reverse_target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and reverse_target_objrefs_.size() = " << reverse_target_objrefs_size);
RAY_CHECK_EQ(objtable_size, reference_counts_size, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size); RAY_CHECK_EQ(objtable_size, reference_counts_size, "objtable_ and reference_counts_ should have the same size, but objtable_.size() = " << objtable_size << " and reference_counts_.size() = " << reference_counts_size);
RAY_CHECK_EQ(objtable_size, contained_objrefs_size, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size); RAY_CHECK_EQ(objtable_size, contained_objrefs_size, "objtable_ and contained_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and contained_objrefs_.size() = " << contained_objrefs_size);
objtable_.push_back(std::vector<ObjStoreId>()); objtable->push_back(std::vector<ObjStoreId>());
target_objrefs_.push_back(UNITIALIZED_ALIAS); target_objrefs->push_back(UNITIALIZED_ALIAS);
reverse_target_objrefs_.push_back(std::vector<ObjRef>()); reverse_target_objrefs->push_back(std::vector<ObjRef>());
reference_counts_.push_back(0); reference_counts->push_back(0);
contained_objrefs_.push_back(std::vector<ObjRef>()); contained_objrefs->push_back(std::vector<ObjRef>());
{ {
// We increment once so the objref doesn't go out of scope before the ObjReady // We increment once so the objref doesn't go out of scope before the ObjReady
// method is called. The corresponding decrement will happen either in // method is called. The corresponding decrement will happen either in
@ -436,89 +401,88 @@ ObjRef SchedulerService::register_new_object() {
void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) {
// add_location must be called with a canonical objref // add_location must be called with a canonical objref
{ RAY_CHECK_NEQ((*reference_counts_.get())[canonical_objref], DEALLOCATED, "Calling ObjReady with canonical_objref " << canonical_objref << ", but this objref has already been deallocated");
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_);
RAY_CHECK_NEQ(reference_counts_[canonical_objref], DEALLOCATED, "Calling ObjReady with canonical_objref " << canonical_objref << ", but this objref has already been deallocated");
}
RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")");
std::lock_guard<std::mutex> objects_lock(objects_lock_); auto objtable = objtable_.get();
RAY_CHECK_LT(canonical_objref, objtable_.size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); RAY_CHECK_LT(canonical_objref, objtable->size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")");
// do a binary search // do a binary search
auto &objstores = objtable_[canonical_objref]; auto &locations = (*objtable)[canonical_objref];
auto pos = std::lower_bound(objstores.begin(), objstores.end(), objstoreid); auto pos = std::lower_bound(locations.begin(), locations.end(), objstoreid);
if (pos == objstores.end() || objstoreid < *pos) { if (pos == locations.end() || objstoreid < *pos) {
objstores.insert(pos, objstoreid); locations.insert(pos, objstoreid);
} }
auto &objects_in_flight = objects_in_transit_[objstoreid]; auto &objects_in_flight = objects_in_transit_[objstoreid];
objects_in_flight.erase(std::remove(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref), objects_in_flight.end()); objects_in_flight.erase(std::remove(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref), objects_in_flight.end());
} }
void SchedulerService::add_canonical_objref(ObjRef objref) { void SchedulerService::add_canonical_objref(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
RAY_CHECK_LT(objref, target_objrefs_.size(), "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); RAY_CHECK_LT(objref, target_objrefs->size(), "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs->size());
RAY_CHECK(target_objrefs_[objref] == UNITIALIZED_ALIAS || target_objrefs_[objref] == objref, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); RAY_CHECK((*target_objrefs)[objref] == UNITIALIZED_ALIAS || (*target_objrefs)[objref] == objref, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << (*target_objrefs)[objref]);
target_objrefs_[objref] = objref; (*target_objrefs)[objref] = objref;
} }
ObjStoreId SchedulerService::get_store(WorkerId workerid) { ObjStoreId SchedulerService::get_store(WorkerId workerid) {
std::lock_guard<std::mutex> lock(workers_lock_); auto workers = workers_.get();
ObjStoreId result = workers_[workerid].objstoreid; ObjStoreId result = (*workers)[workerid].objstoreid;
return result; return result;
} }
void SchedulerService::register_function(const std::string& name, WorkerId workerid, size_t num_return_vals) { void SchedulerService::register_function(const std::string& name, WorkerId workerid, size_t num_return_vals) {
std::lock_guard<std::mutex> lock(fntable_lock_); auto fntable = fntable_.get();
FnInfo& info = fntable_[name]; FnInfo& info = (*fntable)[name];
info.set_num_return_vals(num_return_vals); info.set_num_return_vals(num_return_vals);
info.add_worker(workerid); info.add_worker(workerid);
} }
void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply) { void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply) {
acquire_all_locks(); acquire_all_locks();
for (int i = 0; i < reference_counts_.size(); ++i) { auto reference_counts = reference_counts_.unsafe_get();
reply->add_reference_count(reference_counts_[i]); for (int i = 0; i < reference_counts->size(); ++i) {
reply->add_reference_count((*reference_counts)[i]);
} }
for (int i = 0; i < target_objrefs_.size(); ++i) { auto target_objrefs = target_objrefs_.unsafe_get();
reply->add_target_objref(target_objrefs_[i]); for (int i = 0; i < target_objrefs->size(); ++i) {
reply->add_target_objref((*target_objrefs)[i]);
} }
auto function_table = reply->mutable_function_table(); auto function_table = reply->mutable_function_table();
for (const auto& entry : fntable_) { for (const auto& entry : *fntable_.unsafe_get()) {
(*function_table)[entry.first].set_num_return_vals(entry.second.num_return_vals()); (*function_table)[entry.first].set_num_return_vals(entry.second.num_return_vals());
for (const WorkerId& worker : entry.second.workers()) { for (const WorkerId& worker : entry.second.workers()) {
(*function_table)[entry.first].add_workerid(worker); (*function_table)[entry.first].add_workerid(worker);
} }
} }
for (const auto& entry : task_queue_) { for (const auto& entry : *task_queue_.unsafe_get()) {
reply->add_operationid(entry); reply->add_operationid(entry);
} }
for (const WorkerId& entry : avail_workers_) { for (const WorkerId& entry : *avail_workers_.unsafe_get()) {
reply->add_avail_worker(entry); reply->add_avail_worker(entry);
} }
computation_graph_.to_protobuf(reply->mutable_computation_graph()); computation_graph_.unsafe_get()->to_protobuf(reply->mutable_computation_graph());
release_all_locks(); release_all_locks();
} }
// pick_objstore assumes that objects_lock_ has been acquired
// pick_objstore must be called with a canonical_objref // pick_objstore must be called with a canonical_objref
ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) {
std::mt19937 rng; std::mt19937 rng;
RAY_CHECK(is_canonical(canonical_objref), "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); RAY_CHECK(is_canonical(canonical_objref), "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")");
std::uniform_int_distribution<int> uni(0, objtable_[canonical_objref].size() - 1); auto objtable = objtable_.get();
ObjStoreId objstoreid = objtable_[canonical_objref][uni(rng)]; std::uniform_int_distribution<int> uni(0, (*objtable)[canonical_objref].size() - 1);
ObjStoreId objstoreid = (*objtable)[canonical_objref][uni(rng)];
return objstoreid; return objstoreid;
} }
bool SchedulerService::is_canonical(ObjRef objref) { bool SchedulerService::is_canonical(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
RAY_CHECK_NEQ(target_objrefs_[objref], UNITIALIZED_ALIAS, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); RAY_CHECK_NEQ((*target_objrefs)[objref], UNITIALIZED_ALIAS, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << ".");
return objref == target_objrefs_[objref]; return objref == (*target_objrefs)[objref];
} }
void SchedulerService::perform_gets() { void SchedulerService::perform_gets() {
std::lock_guard<std::mutex> get_queue_lock(get_queue_lock_); auto get_queue = get_queue_.get();
// Complete all get tasks that can be completed. // Complete all get tasks that can be completed.
for (int i = 0; i < get_queue_.size(); ++i) { for (int i = 0; i < get_queue->size(); ++i) {
const std::pair<WorkerId, ObjRef>& get = get_queue_[i]; const std::pair<WorkerId, ObjRef>& get = (*get_queue)[i];
ObjRef objref = get.second; ObjRef objref = get.second;
WorkerId workerid = get.first; WorkerId workerid = get.first;
ObjStoreId objstoreid = get_store(workerid); ObjStoreId objstoreid = get_store(workerid);
@ -528,46 +492,39 @@ void SchedulerService::perform_gets() {
} }
ObjRef canonical_objref = get_canonical_objref(objref); ObjRef canonical_objref = get_canonical_objref(objref);
RAY_LOG(RAY_DEBUG, "attempting to get objref " << get.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); RAY_LOG(RAY_DEBUG, "attempting to get objref " << get.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid));
int num_stores; int num_stores = (*objtable_.get())[canonical_objref].size();
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
num_stores = objtable_[canonical_objref].size();
}
if (num_stores > 0) { if (num_stores > 0) {
deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid); deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid);
{ // Notify the relevant objstore about potential aliasing when it's ready
// Notify the relevant objstore about potential aliasing when it's ready alias_notification_queue_.get()->push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref)));
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref)));
}
// Remove the get task from the queue // Remove the get task from the queue
std::swap(get_queue_[i], get_queue_[get_queue_.size() - 1]); std::swap((*get_queue)[i], (*get_queue)[get_queue->size() - 1]);
get_queue_.pop_back(); get_queue->pop_back();
i -= 1; i -= 1;
} }
} }
} }
void SchedulerService::schedule_tasks_naively() { void SchedulerService::schedule_tasks_naively() {
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_); auto computation_graph = computation_graph_.get();
std::lock_guard<std::mutex> fntable_lock(fntable_lock_); auto fntable = fntable_.get();
std::lock_guard<std::mutex> avail_workers_lock(avail_workers_lock_); auto avail_workers = avail_workers_.get();
std::lock_guard<std::mutex> task_queue_lock(task_queue_lock_); auto task_queue = task_queue_.get();
for (int i = 0; i < avail_workers_.size(); ++i) { for (int i = 0; i < avail_workers->size(); ++i) {
// Submit all tasks whose arguments are ready. // Submit all tasks whose arguments are ready.
WorkerId workerid = avail_workers_[i]; WorkerId workerid = (*avail_workers)[i];
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) { for (auto it = task_queue->begin(); it != task_queue->end(); ++it) {
// The use of erase(it) below invalidates the iterator, but we // The use of erase(it) below invalidates the iterator, but we
// immediately break out of the inner loop, so the iterator is not used // immediately break out of the inner loop, so the iterator is not used
// after the erase // after the erase
const OperationId operationid = *it; const OperationId operationid = *it;
const Task& task = computation_graph_.get_task(operationid); const Task& task = computation_graph->get_task(operationid);
auto& workers = fntable_[task.name()].workers(); auto& workers = (*fntable)[task.name()].workers();
if (std::binary_search(workers.begin(), workers.end(), workerid) && can_run(task)) { if (std::binary_search(workers.begin(), workers.end(), workerid) && can_run(task)) {
assign_task(operationid, workerid); assign_task(operationid, workerid);
task_queue_.erase(it); task_queue->erase(it);
std::swap(avail_workers_[i], avail_workers_[avail_workers_.size() - 1]); std::swap((*avail_workers)[i], (*avail_workers)[avail_workers->size() - 1]);
avail_workers_.pop_back(); avail_workers->pop_back();
i -= 1; i -= 1;
break; break;
} }
@ -576,20 +533,20 @@ void SchedulerService::schedule_tasks_naively() {
} }
void SchedulerService::schedule_tasks_location_aware() { void SchedulerService::schedule_tasks_location_aware() {
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_); auto computation_graph = computation_graph_.get();
std::lock_guard<std::mutex> fntable_lock(fntable_lock_); auto fntable = fntable_.get();
std::lock_guard<std::mutex> avail_workers_lock(avail_workers_lock_); auto avail_workers = avail_workers_.get();
std::lock_guard<std::mutex> task_queue_lock(task_queue_lock_); auto task_queue = task_queue_.get();
for (int i = 0; i < avail_workers_.size(); ++i) { for (int i = 0; i < avail_workers->size(); ++i) {
// Submit all tasks whose arguments are ready. // Submit all tasks whose arguments are ready.
WorkerId workerid = avail_workers_[i]; WorkerId workerid = (*avail_workers)[i];
ObjStoreId objstoreid = get_store(workerid); ObjStoreId objstoreid = get_store(workerid);
auto bestit = task_queue_.end(); // keep track of the task that fits the worker best so far auto bestit = task_queue->end(); // keep track of the task that fits the worker best so far
size_t min_num_shipped_objects = std::numeric_limits<size_t>::max(); // number of objects that need to be transfered for this worker size_t min_num_shipped_objects = std::numeric_limits<size_t>::max(); // number of objects that need to be transfered for this worker
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) { for (auto it = task_queue->begin(); it != task_queue->end(); ++it) {
OperationId operationid = *it; OperationId operationid = *it;
const Task& task = computation_graph_.get_task(operationid); const Task& task = computation_graph->get_task(operationid);
auto& workers = fntable_[task.name()].workers(); auto& workers = (*fntable)[task.name()].workers();
if (std::binary_search(workers.begin(), workers.end(), workerid) && can_run(task)) { if (std::binary_search(workers.begin(), workers.end(), workerid) && can_run(task)) {
// determine how many objects would need to be shipped // determine how many objects would need to be shipped
size_t num_shipped_objects = 0; size_t num_shipped_objects = 0;
@ -600,8 +557,8 @@ void SchedulerService::schedule_tasks_location_aware() {
ObjRef canonical_objref = get_canonical_objref(objref); ObjRef canonical_objref = get_canonical_objref(objref);
{ {
// check if the object is already in the local object store // check if the object is already in the local object store
std::lock_guard<std::mutex> objects_lock(objects_lock_); auto objtable = objtable_.get();
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { if (!std::binary_search((*objtable)[canonical_objref].begin(), (*objtable)[canonical_objref].end(), objstoreid)) {
num_shipped_objects += 1; num_shipped_objects += 1;
} }
} }
@ -614,58 +571,58 @@ void SchedulerService::schedule_tasks_location_aware() {
} }
} }
// if we found a suitable task // if we found a suitable task
if (bestit != task_queue_.end()) { if (bestit != task_queue->end()) {
assign_task(*bestit, workerid); assign_task(*bestit, workerid);
task_queue_.erase(bestit); task_queue->erase(bestit);
std::swap(avail_workers_[i], avail_workers_[avail_workers_.size() - 1]); std::swap((*avail_workers)[i], (*avail_workers)[avail_workers->size() - 1]);
avail_workers_.pop_back(); avail_workers->pop_back();
i -= 1; i -= 1;
} }
} }
} }
void SchedulerService::perform_notify_aliases() { void SchedulerService::perform_notify_aliases() {
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_); auto alias_notification_queue = alias_notification_queue_.get();
for (int i = 0; i < alias_notification_queue_.size(); ++i) { for (int i = 0; i < alias_notification_queue->size(); ++i) {
const std::pair<WorkerId, std::pair<ObjRef, ObjRef> > alias_notification = alias_notification_queue_[i]; const std::pair<WorkerId, std::pair<ObjRef, ObjRef> > alias_notification = (*alias_notification_queue)[i];
ObjStoreId objstoreid = alias_notification.first; ObjStoreId objstoreid = alias_notification.first;
ObjRef alias_objref = alias_notification.second.first; ObjRef alias_objref = alias_notification.second.first;
ObjRef canonical_objref = alias_notification.second.second; ObjRef canonical_objref = alias_notification.second.second;
if (attempt_notify_alias(objstoreid, alias_objref, canonical_objref)) { // this locks both the objstore_ and objtable_ if (attempt_notify_alias(objstoreid, alias_objref, canonical_objref)) { // this locks both the objstore_ and objtable_
// the attempt to notify the objstore of the objref aliasing succeeded, so remove the notification task from the queue // the attempt to notify the objstore of the objref aliasing succeeded, so remove the notification task from the queue
std::swap(alias_notification_queue_[i], alias_notification_queue_[alias_notification_queue_.size() - 1]); std::swap((*alias_notification_queue)[i], (*alias_notification_queue)[alias_notification_queue->size() - 1]);
alias_notification_queue_.pop_back(); alias_notification_queue->pop_back();
i -= 1; i -= 1;
} }
} }
} }
bool SchedulerService::has_canonical_objref(ObjRef objref) { bool SchedulerService::has_canonical_objref(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
ObjRef objref_temp = objref; ObjRef objref_temp = objref;
while (true) { while (true) {
RAY_CHECK_LT(objref_temp, target_objrefs_.size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); RAY_CHECK_LT(objref_temp, target_objrefs->size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs->size());
if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { if ((*target_objrefs)[objref_temp] == UNITIALIZED_ALIAS) {
return false; return false;
} }
if (target_objrefs_[objref_temp] == objref_temp) { if ((*target_objrefs)[objref_temp] == objref_temp) {
return true; return true;
} }
objref_temp = target_objrefs_[objref_temp]; objref_temp = (*target_objrefs)[objref_temp];
} }
} }
ObjRef SchedulerService::get_canonical_objref(ObjRef objref) { ObjRef SchedulerService::get_canonical_objref(ObjRef objref) {
// get_canonical_objref assumes that has_canonical_objref(objref) is true // get_canonical_objref assumes that has_canonical_objref(objref) is true
std::lock_guard<std::mutex> lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
ObjRef objref_temp = objref; ObjRef objref_temp = objref;
while (true) { while (true) {
RAY_CHECK_LT(objref_temp, target_objrefs_.size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); RAY_CHECK_LT(objref_temp, target_objrefs->size(), "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs->size());
RAY_CHECK_NEQ(target_objrefs_[objref_temp], UNITIALIZED_ALIAS, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); RAY_CHECK_NEQ((*target_objrefs)[objref_temp], UNITIALIZED_ALIAS, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << ".");
if (target_objrefs_[objref_temp] == objref_temp) { if ((*target_objrefs)[objref_temp] == objref_temp) {
return objref_temp; return objref_temp;
} }
objref_temp = target_objrefs_[objref_temp]; objref_temp = (*target_objrefs)[objref_temp];
RAY_LOG(RAY_ALIAS, "Looping in get_canonical_objref."); RAY_LOG(RAY_ALIAS, "Looping in get_canonical_objref.");
} }
} }
@ -677,8 +634,8 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_
return true; return true;
} }
{ {
std::lock_guard<std::mutex> lock(objects_lock_); auto objtable = objtable_.get();
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { if (!std::binary_search((*objtable)[canonical_objref].begin(), (*objtable)[canonical_objref].end(), objstoreid)) {
// the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias // the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias
return false; return false;
} }
@ -688,10 +645,7 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_
NotifyAliasRequest request; NotifyAliasRequest request;
request.set_alias_objref(alias_objref); request.set_alias_objref(alias_objref);
request.set_canonical_objref(canonical_objref); request.set_canonical_objref(canonical_objref);
{ (*objstores_.get())[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply);
std::lock_guard<std::mutex> objstores_lock(objstores_lock_);
objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply);
}
return true; return true;
} }
@ -700,50 +654,56 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) {
// deallocate_object also recursively calls decrement_ref_count). Both of // deallocate_object also recursively calls decrement_ref_count). Both of
// these methods require reference_counts_lock_ to have been acquired, and // these methods require reference_counts_lock_ to have been acquired, and
// so the lock must before outside of these methods (it is acquired in // so the lock must before outside of these methods (it is acquired in
// DecrementRefCount). // DecrementRefCount). Because we use contained_objrefs_ in this method, we
// also require contained_objrefs_lock_ to be acquired outside of
// decrement_ref_count.
RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << ".");
{ {
std::lock_guard<std::mutex> objects_lock(objects_lock_); auto objtable = objtable_.get();
auto &objstores = objtable_[canonical_objref]; auto &locations = (*objtable)[canonical_objref];
std::lock_guard<std::mutex> objstores_lock(objstores_lock_); // TODO(rkn): Should this be inside the for loop instead? auto objstores = objstores_.get(); // TODO(rkn): Should this be inside the for loop instead?
for (int i = 0; i < objstores.size(); ++i) { for (int i = 0; i < locations.size(); ++i) {
ClientContext context; ClientContext context;
AckReply reply; AckReply reply;
DeallocateObjectRequest request; DeallocateObjectRequest request;
request.set_canonical_objref(canonical_objref); request.set_canonical_objref(canonical_objref);
ObjStoreId objstoreid = objstores[i]; ObjStoreId objstoreid = locations[i];
RAY_LOG(RAY_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid); RAY_LOG(RAY_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid);
objstores_[objstoreid].objstore_stub->DeallocateObject(&context, request, &reply); (*objstores)[objstoreid].objstore_stub->DeallocateObject(&context, request, &reply);
} }
objtable_[canonical_objref].clear(); locations.clear();
} }
decrement_ref_count(contained_objrefs_[canonical_objref]); decrement_ref_count((*contained_objrefs_.unsafe_get())[canonical_objref]);
} }
void SchedulerService::increment_ref_count(const std::vector<ObjRef> &objrefs) { void SchedulerService::increment_ref_count(const std::vector<ObjRef> &objrefs) {
// increment_ref_count assumes that reference_counts_lock_ has been acquired already // increment_ref_count assumes that reference_counts_lock_ has been acquired already
for (int i = 0; i < objrefs.size(); ++i) { for (int i = 0; i < objrefs.size(); ++i) {
ObjRef objref = objrefs[i]; ObjRef objref = objrefs[i];
RAY_CHECK_NEQ(reference_counts_[objref], DEALLOCATED, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already."); auto reference_counts = reference_counts_.unsafe_get();
reference_counts_[objref] += 1; RAY_CHECK_NEQ((*reference_counts)[objref], DEALLOCATED, "Attempting to increment the reference count for objref " << objref << ", but this object appears to have been deallocated already.");
RAY_LOG(RAY_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << reference_counts_[objref]); (*reference_counts)[objref] += 1;
RAY_LOG(RAY_REFCOUNT, "Incremented ref count for objref " << objref <<". New reference count is " << (*reference_counts)[objref]);
} }
} }
void SchedulerService::decrement_ref_count(const std::vector<ObjRef> &objrefs) { void SchedulerService::decrement_ref_count(const std::vector<ObjRef> &objrefs) {
// decrement_ref_count assumes that reference_counts_lock_ has been acquired already // decrement_ref_count assumes that reference_counts_lock_ and
// contained_objrefs_lock_ have been acquired already. contained_objrefs_lock_
// is needed inside of deallocate_object
for (int i = 0; i < objrefs.size(); ++i) { for (int i = 0; i < objrefs.size(); ++i) {
ObjRef objref = objrefs[i]; ObjRef objref = objrefs[i];
RAY_CHECK_NEQ(reference_counts_[objref], DEALLOCATED, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already."); auto reference_counts = reference_counts_.unsafe_get();
RAY_CHECK_NEQ(reference_counts_[objref], 0, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0."); RAY_CHECK_NEQ((*reference_counts)[objref], DEALLOCATED, "Attempting to decrement the reference count for objref " << objref << ", but this object appears to have been deallocated already.");
reference_counts_[objref] -= 1; RAY_CHECK_NEQ((*reference_counts)[objref], 0, "Attempting to decrement the reference count for objref " << objref << ", but the reference count for this object is already 0.");
RAY_LOG(RAY_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << reference_counts_[objref]); (*reference_counts)[objref] -= 1;
RAY_LOG(RAY_REFCOUNT, "Decremented ref count for objref " << objref << ". New reference count is " << (*reference_counts)[objref]);
// See if we can deallocate the object // See if we can deallocate the object
std::vector<ObjRef> equivalent_objrefs; std::vector<ObjRef> equivalent_objrefs;
get_equivalent_objrefs(objref, equivalent_objrefs); get_equivalent_objrefs(objref, equivalent_objrefs);
bool can_deallocate = true; bool can_deallocate = true;
for (int j = 0; j < equivalent_objrefs.size(); ++j) { for (int j = 0; j < equivalent_objrefs.size(); ++j) {
if (reference_counts_[equivalent_objrefs[j]] != 0) { if ((*reference_counts)[equivalent_objrefs[j]] != 0) {
can_deallocate = false; can_deallocate = false;
break; break;
} }
@ -753,7 +713,7 @@ void SchedulerService::decrement_ref_count(const std::vector<ObjRef> &objrefs) {
RAY_CHECK(is_canonical(canonical_objref), "canonical_objref is not canonical."); RAY_CHECK(is_canonical(canonical_objref), "canonical_objref is not canonical.");
deallocate_object(canonical_objref); deallocate_object(canonical_objref);
for (int j = 0; j < equivalent_objrefs.size(); ++j) { for (int j = 0; j < equivalent_objrefs.size(); ++j) {
reference_counts_[equivalent_objrefs[j]] = DEALLOCATED; (*reference_counts)[equivalent_objrefs[j]] = DEALLOCATED;
} }
} }
} }
@ -762,40 +722,41 @@ void SchedulerService::decrement_ref_count(const std::vector<ObjRef> &objrefs) {
void SchedulerService::upstream_objrefs(ObjRef objref, std::vector<ObjRef> &objrefs) { void SchedulerService::upstream_objrefs(ObjRef objref, std::vector<ObjRef> &objrefs) {
// upstream_objrefs assumes that the lock reverse_target_objrefs_lock_ has been acquired // upstream_objrefs assumes that the lock reverse_target_objrefs_lock_ has been acquired
objrefs.push_back(objref); objrefs.push_back(objref);
for (int i = 0; i < reverse_target_objrefs_[objref].size(); ++i) { auto reverse_target_objrefs = reverse_target_objrefs_.unsafe_get();
upstream_objrefs(reverse_target_objrefs_[objref][i], objrefs); for (int i = 0; i < (*reverse_target_objrefs)[objref].size(); ++i) {
upstream_objrefs((*reverse_target_objrefs)[objref][i], objrefs);
} }
} }
void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef> &equivalent_objrefs) { void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef> &equivalent_objrefs) {
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_); auto target_objrefs = target_objrefs_.get();
ObjRef downstream_objref = objref; ObjRef downstream_objref = objref;
while (target_objrefs_[downstream_objref] != downstream_objref && target_objrefs_[downstream_objref] != UNITIALIZED_ALIAS) { while ((*target_objrefs)[downstream_objref] != downstream_objref && (*target_objrefs)[downstream_objref] != UNITIALIZED_ALIAS) {
RAY_LOG(RAY_ALIAS, "Looping in get_equivalent_objrefs"); RAY_LOG(RAY_ALIAS, "Looping in get_equivalent_objrefs");
downstream_objref = target_objrefs_[downstream_objref]; downstream_objref = (*target_objrefs)[downstream_objref];
} }
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_); auto reverse_target_objrefs = reverse_target_objrefs_.get();
upstream_objrefs(downstream_objref, equivalent_objrefs); upstream_objrefs(downstream_objref, equivalent_objrefs);
} }
// This method defines the order in which locks should be acquired. // This method defines the order in which locks should be acquired.
void SchedulerService::do_on_locks(bool lock) { void SchedulerService::do_on_locks(bool lock) {
std::mutex *mutexes[] = { std::mutex *mutexes[] = {
&successful_tasks_lock_, &successful_tasks_.mutex(),
&failed_tasks_lock_, &failed_tasks_.mutex(),
&get_queue_lock_, &get_queue_.mutex(),
&computation_graph_lock_, &computation_graph_.mutex(),
&fntable_lock_, &fntable_.mutex(),
&avail_workers_lock_, &avail_workers_.mutex(),
&task_queue_lock_, &task_queue_.mutex(),
&reference_counts_lock_, &reference_counts_.mutex(),
&contained_objrefs_lock_, &contained_objrefs_.mutex(),
&workers_lock_, &workers_.mutex(),
&alias_notification_queue_lock_, &alias_notification_queue_.mutex(),
&objects_lock_, &objtable_.mutex(),
&objstores_lock_, &objstores_.mutex(),
&target_objrefs_lock_, &target_objrefs_.mutex(),
&reverse_target_objrefs_lock_ &reverse_target_objrefs_.mutex()
}; };
size_t n = sizeof(mutexes) / sizeof(*mutexes); size_t n = sizeof(mutexes) / sizeof(*mutexes);
for (size_t i = 0; i != n; ++i) { for (size_t i = 0; i != n; ++i) {

View file

@ -14,6 +14,7 @@
#include "ray.grpc.pb.h" #include "ray.grpc.pb.h"
#include "types.pb.h" #include "types.pb.h"
#include "utils.h"
#include "computation_graph.h" #include "computation_graph.h"
using grpc::Server; using grpc::Server;
@ -133,32 +134,26 @@ private:
// The computation graph tracks the operations that have been submitted to the // The computation graph tracks the operations that have been submitted to the
// scheduler and is mostly used for fault tolerance. // scheduler and is mostly used for fault tolerance.
ComputationGraph computation_graph_; Synchronized<ComputationGraph> computation_graph_;
std::mutex computation_graph_lock_;
// Vector of all workers registered in the system. Their index in this vector // Vector of all workers registered in the system. Their index in this vector
// is the workerid. // is the workerid.
std::vector<WorkerHandle> workers_; Synchronized<std::vector<WorkerHandle> > workers_;
std::mutex workers_lock_;
// Vector of all workers that are currently idle. // Vector of all workers that are currently idle.
std::vector<WorkerId> avail_workers_; Synchronized<std::vector<WorkerId> > avail_workers_;
std::mutex avail_workers_lock_;
// Vector of all object stores registered in the system. Their index in this // Vector of all object stores registered in the system. Their index in this
// vector is the objstoreid. // vector is the objstoreid.
std::vector<ObjStoreHandle> objstores_; Synchronized<std::vector<ObjStoreHandle> > objstores_;
grpc::mutex objstores_lock_;
// Mapping from an aliased objref to the objref it is aliased with. If an // Mapping from an aliased objref to the objref it is aliased with. If an
// objref is a canonical objref (meaning it is not aliased), then // objref is a canonical objref (meaning it is not aliased), then
// target_objrefs_[objref] == objref. For each objref, target_objrefs_[objref] // target_objrefs_[objref] == objref. For each objref, target_objrefs_[objref]
// is initialized to UNITIALIZED_ALIAS and the correct value is filled later // is initialized to UNITIALIZED_ALIAS and the correct value is filled later
// when it is known. // when it is known.
std::vector<ObjRef> target_objrefs_; Synchronized<std::vector<ObjRef> > target_objrefs_;
std::mutex target_objrefs_lock_;
// This data structure maps an objref to all of the objrefs that alias it (there could be multiple such objrefs). // This data structure maps an objref to all of the objrefs that alias it (there could be multiple such objrefs).
std::vector<std::vector<ObjRef> > reverse_target_objrefs_; Synchronized<std::vector<std::vector<ObjRef> > > reverse_target_objrefs_;
std::mutex reverse_target_objrefs_lock_;
// Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_. // Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_.
ObjTable objtable_; Synchronized<ObjTable> objtable_; // This lock protects objtable_ and objects_in_transit_
std::mutex objects_lock_; // This lock protects objtable_ and objects_in_transit_
// For each object store objstoreid, objects_in_transit_[objstoreid] is a // For each object store objstoreid, objects_in_transit_[objstoreid] is a
// vector of the canonical object references that are being streamed to that // vector of the canonical object references that are being streamed to that
// object store but are not yet present. Object references are added to this // object store but are not yet present. Object references are added to this
@ -166,36 +161,29 @@ private:
// the same object to a given object store twice), and object references are // the same object to a given object store twice), and object references are
// removed when add_location is called (from ObjReady), and they are moved to // removed when add_location is called (from ObjReady), and they are moved to
// the objtable_. Note that objects_in_transit_ and objtable_ share the same // the objtable_. Note that objects_in_transit_ and objtable_ share the same
// lock (objects_lock_). // lock (objects_lock_). // TODO(rkn): Consider making this part of the
// objtable data structure.
std::vector<std::vector<ObjRef> > objects_in_transit_; std::vector<std::vector<ObjRef> > objects_in_transit_;
// Hash map from function names to workers where the function is registered. // Hash map from function names to workers where the function is registered.
FnTable fntable_; Synchronized<FnTable> fntable_;
std::mutex fntable_lock_;
// List of pending tasks. // List of pending tasks.
std::deque<OperationId> task_queue_; Synchronized<std::deque<OperationId> > task_queue_;
std::mutex task_queue_lock_;
// List of pending get calls. // List of pending get calls.
std::vector<std::pair<WorkerId, ObjRef> > get_queue_; Synchronized<std::vector<std::pair<WorkerId, ObjRef> > > get_queue_;
std::mutex get_queue_lock_;
// List of failed tasks // List of failed tasks
std::vector<TaskStatus> failed_tasks_; Synchronized<std::vector<TaskStatus> > failed_tasks_;
std::mutex failed_tasks_lock_;
// List of the IDs of successful tasks // List of the IDs of successful tasks
std::vector<OperationId> successful_tasks_; // Right now, we only use this information in the TaskInfo call. Synchronized<std::vector<OperationId> > successful_tasks_; // Right now, we only use this information in the TaskInfo call.
std::mutex successful_tasks_lock_;
// List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)). // List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)).
std::vector<std::pair<ObjStoreId, std::pair<ObjRef, ObjRef> > > alias_notification_queue_; Synchronized<std::vector<std::pair<ObjStoreId, std::pair<ObjRef, ObjRef> > > > alias_notification_queue_;
std::mutex alias_notification_queue_lock_;
// Reference counts. Currently, reference_counts_[objref] is the number of // Reference counts. Currently, reference_counts_[objref] is the number of
// existing references held to objref. This is done for all objrefs, not just // existing references held to objref. This is done for all objrefs, not just
// canonical_objrefs. This data structure completely ignores aliasing. If the // canonical_objrefs. This data structure completely ignores aliasing. If the
// object corresponding to objref has been deallocated, then // object corresponding to objref has been deallocated, then
// reference_counts[objref] will equal DEALLOCATED. // reference_counts[objref] will equal DEALLOCATED.
std::vector<RefCount> reference_counts_; Synchronized<std::vector<RefCount> > reference_counts_;
std::mutex reference_counts_lock_;
// contained_objrefs_[objref] is a vector of all of the objrefs contained inside the object referred to by objref // contained_objrefs_[objref] is a vector of all of the objrefs contained inside the object referred to by objref
std::vector<std::vector<ObjRef> > contained_objrefs_; Synchronized<std::vector<std::vector<ObjRef> > > contained_objrefs_;
std::mutex contained_objrefs_lock_;
// the scheduling algorithm that will be used // the scheduling algorithm that will be used
SchedulingAlgorithmType scheduling_algorithm_; SchedulingAlgorithmType scheduling_algorithm_;
}; };

View file

@ -34,6 +34,7 @@ class Synchronized {
T value_; T value_;
public: public:
typedef T element_type; typedef T element_type;
typedef Mutex mutex_type;
template<class... U> template<class... U>
Synchronized(U&&... args) : value_(std::forward<T>(args)...) { } Synchronized(U&&... args) : value_(std::forward<T>(args)...) { }
Synchronized(const Synchronized& other) : value_(*other) { } Synchronized(const Synchronized& other) : value_(*other) { }
@ -47,6 +48,7 @@ public:
SynchronizedPtr<const Synchronized> get() const { return *this; } SynchronizedPtr<const Synchronized> get() const { return *this; }
element_type* unsafe_get() { return &value_; } element_type* unsafe_get() { return &value_; }
const element_type* unsafe_get() const { return &value_; } const element_type* unsafe_get() const { return &value_; }
mutex_type& mutex() { return mutex_; }
}; };
std::string::iterator split_ip_address(std::string& ip_address); std::string::iterator split_ip_address(std::string& ip_address);