cleanups locks in the scheduler

This commit is contained in:
Robert Nishihara 2016-06-18 00:11:46 -07:00
parent 0e5feecd65
commit 3ae07b293e
2 changed files with 158 additions and 91 deletions

View file

@ -40,14 +40,16 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ
operation->set_allocated_task(task.release());
OperationId creator_operationid = ROOT_OPERATION; // TODO(rkn): Later, this should be the ID of the task that spawned this current task.
operation->set_creator_operationid(creator_operationid);
computation_graph_lock_.lock();
OperationId operationid = computation_graph_.add_operation(std::move(operation));
computation_graph_lock_.unlock();
task_queue_lock_.lock();
task_queue_.push_back(operationid);
task_queue_lock_.unlock();
OperationId operationid;
{
std::lock_guard<std::mutex> computation_graph_lock(computation_graph_lock_);
operationid = computation_graph_.add_operation(std::move(operation));
}
{
std::lock_guard<std::mutex> task_queue_lock(task_queue_lock_);
task_queue_.push_back(operationid);
}
schedule();
}
return Status::OK;
@ -62,16 +64,17 @@ Status SchedulerService::PushObj(ServerContext* context, const PushObjRequest* r
}
Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) {
objtable_lock_.lock();
size_t size = objtable_.size();
objtable_lock_.unlock();
size_t size;
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
size = objtable_.size();
}
ObjRef objref = request->objref();
RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists");
pull_queue_lock_.lock();
pull_queue_.push_back(std::make_pair(request->workerid(), objref));
pull_queue_lock_.unlock();
{
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_);
pull_queue_.push_back(std::make_pair(request->workerid(), objref));
}
schedule();
return Status::OK;
}
@ -81,9 +84,11 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs
ObjRef target_objref = request->target_objref();
RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref);
RAY_CHECK_NEQ(alias_objref, target_objref, "internal error: attempting to alias objref " << alias_objref << " with itself.");
objtable_lock_.lock();
size_t size = objtable_.size();
objtable_lock_.unlock();
size_t size;
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
size = objtable_.size();
}
RAY_CHECK_LT(alias_objref, size, "internal error: no object with objref " << alias_objref << " exists");
RAY_CHECK_LT(target_objref, size, "internal error: no object with objref " << target_objref << " exists");
{
@ -100,6 +105,7 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs
}
Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) {
std::lock_guard<std::mutex> objects_lock(objects_lock_); // to protect objects_in_transit_
std::lock_guard<std::mutex> objstore_lock(objstores_lock_);
ObjStoreId objstoreid = objstores_.size();
auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials());
@ -108,6 +114,7 @@ Status SchedulerService::RegisterObjStore(ServerContext* context, const Register
objstores_[objstoreid].channel = channel;
objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel);
reply->set_objstoreid(objstoreid);
objects_in_transit_.push_back(std::vector<ObjRef>());
return Status::OK;
}
@ -215,6 +222,24 @@ Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest*
return Status::OK;
}
void SchedulerService::deliver_object_if_necessary(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) {
bool object_present_or_in_transit;
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
auto &objstores = objtable_[canonical_objref];
bool object_present = std::binary_search(objstores.begin(), objstores.end(), to);
auto &objects_in_flight = objects_in_transit_[to];
bool object_in_transit = (std::find(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref) != objects_in_flight.end());
object_present_or_in_transit = object_present || object_in_transit;
if (!object_present_or_in_transit) {
objects_in_flight.push_back(canonical_objref);
}
}
if (!object_present_or_in_transit) {
deliver_object(canonical_objref, from, to);
}
}
// TODO(rkn): This could execute multiple times with the same arguments before
// the delivery finishes, but we only want it to happen once. Currently, the
// redundancy is handled by the object store, which will only execute the
@ -222,13 +247,12 @@ Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest*
// future.
//
// deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true
void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) {
RAY_CHECK_NEQ(from, to, "attempting to deliver objref " << objref << " from objstore " << from << " to itself.");
RAY_CHECK(has_canonical_objref(objref), "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref.");
void SchedulerService::deliver_object(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) {
RAY_CHECK_NEQ(from, to, "attempting to deliver canonical_objref " << canonical_objref << " from objstore " << from << " to itself.");
RAY_CHECK(is_canonical(canonical_objref), "attempting to deliver objref " << canonical_objref << ", but this objref is not a canonical objref.");
ClientContext context;
AckReply reply;
StartDeliveryRequest request;
ObjRef canonical_objref = get_canonical_objref(objref);
request.set_objref(canonical_objref);
std::lock_guard<std::mutex> lock(objstores_lock_);
request.set_objstore_address(objstores_[from].address);
@ -251,6 +275,7 @@ void SchedulerService::schedule() {
// assign_task assumes that computation_graph_lock_ has been acquired.
// assign_task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments
void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) {
ObjStoreId objstoreid = get_store(workerid);
const Task& task = computation_graph_.get_task(operationid);
ClientContext context;
ExecuteTaskRequest request;
@ -263,26 +288,23 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) {
{
// Notify the relevant objstore about potential aliasing when it's ready
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref)));
alias_notification_queue_.push_back(std::make_pair(objstoreid, std::make_pair(objref, canonical_objref)));
}
attempt_notify_alias(get_store(workerid), objref, canonical_objref);
attempt_notify_alias(objstoreid, objref, canonical_objref);
RAY_LOG(RAY_DEBUG, "task contains object ref " << canonical_objref);
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
auto &objstores = objtable_[canonical_objref];
std::lock_guard<std::mutex> workers_lock(workers_lock_);
if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { // TODO(rkn): replace this with get_store
deliver_object(canonical_objref, pick_objstore(canonical_objref), workers_[workerid].objstoreid); // TODO(rkn): replace this with get_store
}
deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid);
}
}
workers_[workerid].current_task = operationid;
request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here?
Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply);
{
std::lock_guard<std::mutex> workers_lock(workers_lock_);
workers_[workerid].current_task = operationid;
request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here?
Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply);
}
}
bool SchedulerService::can_run(const Task& task) {
std::lock_guard<std::mutex> lock(objtable_lock_);
std::lock_guard<std::mutex> lock(objects_lock_);
for (int i = 0; i < task.arg_size(); ++i) {
if (!task.arg(i).has_obj()) {
ObjRef objref = task.arg(i).ref();
@ -310,20 +332,22 @@ std::pair<WorkerId, ObjStoreId> SchedulerService::register_worker(const std::str
}
}
if (objstoreid == std::numeric_limits<size_t>::max()) {
std::this_thread::sleep_for (std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
RAY_CHECK_NEQ(objstoreid, std::numeric_limits<size_t>::max(), "object store with address " << objstore_address << " not yet registered");
workers_lock_.lock();
WorkerId workerid = workers_.size();
workers_.push_back(WorkerHandle());
auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials());
workers_[workerid].channel = channel;
workers_[workerid].objstoreid = objstoreid;
workers_[workerid].worker_stub = WorkerService::NewStub(channel);
workers_[workerid].worker_address = worker_address;
workers_[workerid].current_task = NO_OPERATION;
workers_lock_.unlock();
WorkerId workerid;
{
std::lock_guard<std::mutex> workers_lock(workers_lock_);
workerid = workers_.size();
workers_.push_back(WorkerHandle());
auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials());
workers_[workerid].channel = channel;
workers_[workerid].objstoreid = objstoreid;
workers_[workerid].worker_stub = WorkerService::NewStub(channel);
workers_[workerid].worker_address = worker_address;
workers_[workerid].current_task = NO_OPERATION;
}
return std::make_pair(workerid, objstoreid);
}
@ -332,7 +356,7 @@ ObjRef SchedulerService::register_new_object() {
// TODO(rkn): increment/decrement_reference_count also acquire reference_counts_lock_ and target_objrefs_lock_ (through has_canonical_objref()), which caused deadlock in the past
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_);
std::lock_guard<std::mutex> contained_objrefs_lock(contained_objrefs_lock_);
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> objects_lock(objects_lock_);
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_);
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_);
ObjRef objtable_size = objtable_.size();
@ -355,13 +379,16 @@ ObjRef SchedulerService::register_new_object() {
void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) {
// add_location must be called with a canonical objref
RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")");
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> objects_lock(objects_lock_);
RAY_CHECK_LT(canonical_objref, objtable_.size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")");
// do a binary search
auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid);
if (pos == objtable_[canonical_objref].end() || objstoreid < *pos) {
objtable_[canonical_objref].insert(pos, objstoreid);
auto &objstores = objtable_[canonical_objref];
auto pos = std::lower_bound(objstores.begin(), objstores.end(), objstoreid);
if (pos == objstores.end() || objstoreid < *pos) {
objstores.insert(pos, objstoreid);
}
auto &objects_in_flight = objects_in_transit_[objstoreid];
objects_in_flight.erase(std::remove(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref), objects_in_flight.end());
}
void SchedulerService::add_canonical_objref(ObjRef objref) {
@ -385,18 +412,7 @@ void SchedulerService::register_function(const std::string& name, WorkerId worke
}
void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply) {
// TODO(rkn): Also grab the objstores_lock_
// alias_notification_queue_lock_ may need to come before objtable_lock_
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_);
std::lock_guard<std::mutex> contained_objrefs_lock(contained_objrefs_lock_);
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_);
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_);
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_);
std::lock_guard<std::mutex> fntable_lock(fntable_lock_);
std::lock_guard<std::mutex> avail_workers_lock(avail_workers_lock_);
std::lock_guard<std::mutex> task_queue_lock(task_queue_lock_);
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
acquire_all_locks();
for (int i = 0; i < reference_counts_.size(); ++i) {
reply->add_reference_count(reference_counts_[i]);
}
@ -416,10 +432,10 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn
for (const WorkerId& entry : avail_workers_) {
reply->add_avail_worker(entry);
}
release_all_locks();
}
// pick_objstore assumes that objtable_lock_ has been acquired
// pick_objstore assumes that objects_lock_ has been acquired
// pick_objstore must be called with a canonical_objref
ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) {
std::mt19937 rng;
@ -442,27 +458,20 @@ void SchedulerService::perform_pulls() {
const std::pair<WorkerId, ObjRef>& pull = pull_queue_[i];
ObjRef objref = pull.second;
WorkerId workerid = pull.first;
ObjStoreId objstoreid = get_store(workerid);
if (!has_canonical_objref(objref)) {
RAY_LOG(RAY_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing");
continue;
}
ObjRef canonical_objref = get_canonical_objref(objref);
RAY_LOG(RAY_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid));
objtable_lock_.lock();
int num_stores = objtable_[canonical_objref].size();
objtable_lock_.unlock();
int num_stores;
{
std::lock_guard<std::mutex> objects_lock(objects_lock_);
num_stores = objtable_[canonical_objref].size();
}
if (num_stores > 0) {
{
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), get_store(workerid))) {
// The worker's local object store does not already contain objref, so ship
// it there from an object store that does have it.
ObjStoreId objstoreid = pick_objstore(canonical_objref);
deliver_object(canonical_objref, objstoreid, get_store(workerid));
}
}
deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid);
{
// Notify the relevant objstore about potential aliasing when it's ready
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
@ -511,7 +520,7 @@ void SchedulerService::schedule_tasks_location_aware() {
for (int i = 0; i < avail_workers_.size(); ++i) {
// Submit all tasks whose arguments are ready.
WorkerId workerid = avail_workers_[i];
ObjStoreId objstoreid = workers_[workerid].objstoreid;
ObjStoreId objstoreid = get_store(workerid);
auto bestit = task_queue_.end(); // keep track of the task that fits the worker best so far
size_t min_num_shipped_objects = std::numeric_limits<size_t>::max(); // number of objects that need to be transfered for this worker
for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) {
@ -526,9 +535,12 @@ void SchedulerService::schedule_tasks_location_aware() {
ObjRef objref = task.arg(j).ref();
RAY_CHECK(has_canonical_objref(objref), "no canonical object ref found even though task is ready; that should not be possible!");
ObjRef canonical_objref = get_canonical_objref(objref);
// check if the object is already in the local object store
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) {
num_shipped_objects += 1;
{
// check if the object is already in the local object store
std::lock_guard<std::mutex> objects_lock(objects_lock_);
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) {
num_shipped_objects += 1;
}
}
}
}
@ -602,7 +614,7 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_
return true;
}
{
std::lock_guard<std::mutex> lock(objtable_lock_);
std::lock_guard<std::mutex> lock(objects_lock_);
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) {
// the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias
return false;
@ -613,9 +625,10 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_
NotifyAliasRequest request;
request.set_alias_objref(alias_objref);
request.set_canonical_objref(canonical_objref);
objstores_lock_.lock();
objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply);
objstores_lock_.unlock();
{
std::lock_guard<std::mutex> objstores_lock(objstores_lock_);
objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply);
}
return true;
}
@ -627,7 +640,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) {
// DecrementRefCount).
RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << ".");
{
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> objects_lock(objects_lock_);
auto &objstores = objtable_[canonical_objref];
std::lock_guard<std::mutex> objstores_lock(objstores_lock_); // TODO(rkn): Should this be inside the for loop instead?
for (int i = 0; i < objstores.size(); ++i) {
@ -702,6 +715,41 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef>
upstream_objrefs(downstream_objref, equivalent_objrefs);
}
// This method defines the order in which locks should be acquired.
void SchedulerService::do_on_locks(bool lock) {
std::mutex *mutexes[] = {
&pull_queue_lock_,
&computation_graph_lock_,
&fntable_lock_,
&avail_workers_lock_,
&task_queue_lock_,
&alias_notification_queue_lock_,
&workers_lock_,
&reference_counts_lock_,
&contained_objrefs_lock_,
&objects_lock_,
&objstores_lock_,
&target_objrefs_lock_,
&reverse_target_objrefs_lock_,
};
size_t n = sizeof(mutexes) / sizeof(*mutexes);
for (size_t i = 0; i != n; ++i) {
if (lock) {
mutexes[i]->lock();
} else {
mutexes[n - i - 1]->unlock();
}
}
}
void SchedulerService::acquire_all_locks() {
do_on_locks(true);
}
void SchedulerService::release_all_locks() {
do_on_locks(false);
}
void start_scheduler_service(const char* service_addr, SchedulingAlgorithmType scheduling_algorithm) {
std::string service_address(service_addr);
std::string::iterator split_point = split_ip_address(service_address);

View file

@ -69,7 +69,11 @@ public:
Status SchedulerInfo(ServerContext* context, const SchedulerInfoRequest* request, SchedulerInfoReply* reply) override;
Status TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) override;
// ask an object store to send object to another objectstore
// This will ask an object store to send an object to another object store if
// the object is not already present in that object store and is not already
// being transmitted.
void deliver_object_if_necessary(ObjRef objref, ObjStoreId from, ObjStoreId to);
// ask an object store to send object to another object store
void deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to);
// assign a task to a worker
void schedule();
@ -92,7 +96,7 @@ public:
// get information about the scheduler state
void get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply);
private:
// pick an objectstore that holds a given object (needs protection by objtable_lock_)
// pick an objectstore that holds a given object (needs protection by objects_lock_)
ObjStoreId pick_objstore(ObjRef objref);
// checks if objref is a canonical objref
bool is_canonical(ObjRef objref);
@ -120,6 +124,12 @@ private:
void upstream_objrefs(ObjRef objref, std::vector<ObjRef> &objrefs);
// Find all of the object references that refer to the same object as objref (as best as we can determine at the moment). The information may be incomplete because not all of the aliases may be known.
void get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef> &equivalent_objrefs);
// acquires all locks, this should only be used by get_info and for fault tolerance
void acquire_all_locks();
// release all locks, this should only be used by get_info and for fault tolerance
void release_all_locks();
// acquire or release all the locks. This is a single method to ensure a single canonical ordering of the locks.
void do_on_locks(bool lock);
// The computation graph tracks the operations that have been submitted to the
// scheduler and is mostly used for fault tolerance.
@ -148,7 +158,16 @@ private:
std::mutex reverse_target_objrefs_lock_;
// Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_.
ObjTable objtable_;
std::mutex objtable_lock_;
std::mutex objects_lock_; // This lock protects objtable_ and objects_in_transit_
// For each object store objstoreid, objects_in_transit_[objstoreid] is a
// vector of the canonical object references that are being streamed to that
// object store but are not yet present. Object references are added to this
// in deliver_object_if_necessary (to ensure that we do not attempt to deliver
// the same object to a given object store twice), and object references are
// removed when add_location is called (from ObjReady), and they are moved to
// the objtable_. Note that objects_in_transit_ and objtable_ share the same
// lock (objects_lock_).
std::vector<std::vector<ObjRef> > objects_in_transit_;
// Hash map from function names to workers where the function is registered.
FnTable fntable_;
std::mutex fntable_lock_;