diff --git a/src/scheduler.cc b/src/scheduler.cc index 1a4f25f65..47cd17b84 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -40,14 +40,16 @@ Status SchedulerService::SubmitTask(ServerContext* context, const SubmitTaskRequ operation->set_allocated_task(task.release()); OperationId creator_operationid = ROOT_OPERATION; // TODO(rkn): Later, this should be the ID of the task that spawned this current task. operation->set_creator_operationid(creator_operationid); - computation_graph_lock_.lock(); - OperationId operationid = computation_graph_.add_operation(std::move(operation)); - computation_graph_lock_.unlock(); - - task_queue_lock_.lock(); - task_queue_.push_back(operationid); - task_queue_lock_.unlock(); + OperationId operationid; + { + std::lock_guard computation_graph_lock(computation_graph_lock_); + operationid = computation_graph_.add_operation(std::move(operation)); + } + { + std::lock_guard task_queue_lock(task_queue_lock_); + task_queue_.push_back(operationid); + } schedule(); } return Status::OK; @@ -62,16 +64,17 @@ Status SchedulerService::PushObj(ServerContext* context, const PushObjRequest* r } Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) { - objtable_lock_.lock(); - size_t size = objtable_.size(); - objtable_lock_.unlock(); - + size_t size; + { + std::lock_guard objects_lock(objects_lock_); + size = objtable_.size(); + } ObjRef objref = request->objref(); RAY_CHECK_LT(objref, size, "internal error: no object with objref " << objref << " exists"); - - pull_queue_lock_.lock(); - pull_queue_.push_back(std::make_pair(request->workerid(), objref)); - pull_queue_lock_.unlock(); + { + std::lock_guard pull_queue_lock(pull_queue_lock_); + pull_queue_.push_back(std::make_pair(request->workerid(), objref)); + } schedule(); return Status::OK; } @@ -81,9 +84,11 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs ObjRef target_objref = request->target_objref(); RAY_LOG(RAY_ALIAS, "Aliasing objref " << alias_objref << " with objref " << target_objref); RAY_CHECK_NEQ(alias_objref, target_objref, "internal error: attempting to alias objref " << alias_objref << " with itself."); - objtable_lock_.lock(); - size_t size = objtable_.size(); - objtable_lock_.unlock(); + size_t size; + { + std::lock_guard objects_lock(objects_lock_); + size = objtable_.size(); + } RAY_CHECK_LT(alias_objref, size, "internal error: no object with objref " << alias_objref << " exists"); RAY_CHECK_LT(target_objref, size, "internal error: no object with objref " << target_objref << " exists"); { @@ -100,6 +105,7 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs } Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { + std::lock_guard objects_lock(objects_lock_); // to protect objects_in_transit_ std::lock_guard objstore_lock(objstores_lock_); ObjStoreId objstoreid = objstores_.size(); auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials()); @@ -108,6 +114,7 @@ Status SchedulerService::RegisterObjStore(ServerContext* context, const Register objstores_[objstoreid].channel = channel; objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel); reply->set_objstoreid(objstoreid); + objects_in_transit_.push_back(std::vector()); return Status::OK; } @@ -215,6 +222,24 @@ Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest* return Status::OK; } +void SchedulerService::deliver_object_if_necessary(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) { + bool object_present_or_in_transit; + { + std::lock_guard objects_lock(objects_lock_); + auto &objstores = objtable_[canonical_objref]; + bool object_present = std::binary_search(objstores.begin(), objstores.end(), to); + auto &objects_in_flight = objects_in_transit_[to]; + bool object_in_transit = (std::find(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref) != objects_in_flight.end()); + object_present_or_in_transit = object_present || object_in_transit; + if (!object_present_or_in_transit) { + objects_in_flight.push_back(canonical_objref); + } + } + if (!object_present_or_in_transit) { + deliver_object(canonical_objref, from, to); + } +} + // TODO(rkn): This could execute multiple times with the same arguments before // the delivery finishes, but we only want it to happen once. Currently, the // redundancy is handled by the object store, which will only execute the @@ -222,13 +247,12 @@ Status SchedulerService::TaskInfo(ServerContext* context, const TaskInfoRequest* // future. // // deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true -void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { - RAY_CHECK_NEQ(from, to, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); - RAY_CHECK(has_canonical_objref(objref), "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); +void SchedulerService::deliver_object(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) { + RAY_CHECK_NEQ(from, to, "attempting to deliver canonical_objref " << canonical_objref << " from objstore " << from << " to itself."); + RAY_CHECK(is_canonical(canonical_objref), "attempting to deliver objref " << canonical_objref << ", but this objref is not a canonical objref."); ClientContext context; AckReply reply; StartDeliveryRequest request; - ObjRef canonical_objref = get_canonical_objref(objref); request.set_objref(canonical_objref); std::lock_guard lock(objstores_lock_); request.set_objstore_address(objstores_[from].address); @@ -251,6 +275,7 @@ void SchedulerService::schedule() { // assign_task assumes that computation_graph_lock_ has been acquired. // assign_task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { + ObjStoreId objstoreid = get_store(workerid); const Task& task = computation_graph_.get_task(operationid); ClientContext context; ExecuteTaskRequest request; @@ -263,26 +288,23 @@ void SchedulerService::assign_task(OperationId operationid, WorkerId workerid) { { // Notify the relevant objstore about potential aliasing when it's ready std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); - alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref))); + alias_notification_queue_.push_back(std::make_pair(objstoreid, std::make_pair(objref, canonical_objref))); } - attempt_notify_alias(get_store(workerid), objref, canonical_objref); - + attempt_notify_alias(objstoreid, objref, canonical_objref); RAY_LOG(RAY_DEBUG, "task contains object ref " << canonical_objref); - std::lock_guard objtable_lock(objtable_lock_); - auto &objstores = objtable_[canonical_objref]; - std::lock_guard workers_lock(workers_lock_); - if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { // TODO(rkn): replace this with get_store - deliver_object(canonical_objref, pick_objstore(canonical_objref), workers_[workerid].objstoreid); // TODO(rkn): replace this with get_store - } + deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid); } } - workers_[workerid].current_task = operationid; - request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here? - Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply); + { + std::lock_guard workers_lock(workers_lock_); + workers_[workerid].current_task = operationid; + request.mutable_task()->CopyFrom(task); // TODO(rkn): Is ownership handled properly here? + Status status = workers_[workerid].worker_stub->ExecuteTask(&context, request, &reply); + } } bool SchedulerService::can_run(const Task& task) { - std::lock_guard lock(objtable_lock_); + std::lock_guard lock(objects_lock_); for (int i = 0; i < task.arg_size(); ++i) { if (!task.arg(i).has_obj()) { ObjRef objref = task.arg(i).ref(); @@ -310,20 +332,22 @@ std::pair SchedulerService::register_worker(const std::str } } if (objstoreid == std::numeric_limits::max()) { - std::this_thread::sleep_for (std::chrono::milliseconds(100)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } RAY_CHECK_NEQ(objstoreid, std::numeric_limits::max(), "object store with address " << objstore_address << " not yet registered"); - workers_lock_.lock(); - WorkerId workerid = workers_.size(); - workers_.push_back(WorkerHandle()); - auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials()); - workers_[workerid].channel = channel; - workers_[workerid].objstoreid = objstoreid; - workers_[workerid].worker_stub = WorkerService::NewStub(channel); - workers_[workerid].worker_address = worker_address; - workers_[workerid].current_task = NO_OPERATION; - workers_lock_.unlock(); + WorkerId workerid; + { + std::lock_guard workers_lock(workers_lock_); + workerid = workers_.size(); + workers_.push_back(WorkerHandle()); + auto channel = grpc::CreateChannel(worker_address, grpc::InsecureChannelCredentials()); + workers_[workerid].channel = channel; + workers_[workerid].objstoreid = objstoreid; + workers_[workerid].worker_stub = WorkerService::NewStub(channel); + workers_[workerid].worker_address = worker_address; + workers_[workerid].current_task = NO_OPERATION; + } return std::make_pair(workerid, objstoreid); } @@ -332,7 +356,7 @@ ObjRef SchedulerService::register_new_object() { // TODO(rkn): increment/decrement_reference_count also acquire reference_counts_lock_ and target_objrefs_lock_ (through has_canonical_objref()), which caused deadlock in the past std::lock_guard reference_counts_lock(reference_counts_lock_); std::lock_guard contained_objrefs_lock(contained_objrefs_lock_); - std::lock_guard objtable_lock(objtable_lock_); + std::lock_guard objects_lock(objects_lock_); std::lock_guard target_objrefs_lock(target_objrefs_lock_); std::lock_guard reverse_target_objrefs_lock(reverse_target_objrefs_lock_); ObjRef objtable_size = objtable_.size(); @@ -355,13 +379,16 @@ ObjRef SchedulerService::register_new_object() { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { // add_location must be called with a canonical objref RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); - std::lock_guard objtable_lock(objtable_lock_); + std::lock_guard objects_lock(objects_lock_); RAY_CHECK_LT(canonical_objref, objtable_.size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); // do a binary search - auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid); - if (pos == objtable_[canonical_objref].end() || objstoreid < *pos) { - objtable_[canonical_objref].insert(pos, objstoreid); + auto &objstores = objtable_[canonical_objref]; + auto pos = std::lower_bound(objstores.begin(), objstores.end(), objstoreid); + if (pos == objstores.end() || objstoreid < *pos) { + objstores.insert(pos, objstoreid); } + auto &objects_in_flight = objects_in_transit_[objstoreid]; + objects_in_flight.erase(std::remove(objects_in_flight.begin(), objects_in_flight.end(), canonical_objref), objects_in_flight.end()); } void SchedulerService::add_canonical_objref(ObjRef objref) { @@ -385,18 +412,7 @@ void SchedulerService::register_function(const std::string& name, WorkerId worke } void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply) { - // TODO(rkn): Also grab the objstores_lock_ - // alias_notification_queue_lock_ may need to come before objtable_lock_ - std::lock_guard reference_counts_lock(reference_counts_lock_); - std::lock_guard contained_objrefs_lock(contained_objrefs_lock_); - std::lock_guard objtable_lock(objtable_lock_); - std::lock_guard pull_queue_lock(pull_queue_lock_); - std::lock_guard target_objrefs_lock(target_objrefs_lock_); - std::lock_guard reverse_target_objrefs_lock(reverse_target_objrefs_lock_); - std::lock_guard fntable_lock(fntable_lock_); - std::lock_guard avail_workers_lock(avail_workers_lock_); - std::lock_guard task_queue_lock(task_queue_lock_); - std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); + acquire_all_locks(); for (int i = 0; i < reference_counts_.size(); ++i) { reply->add_reference_count(reference_counts_[i]); } @@ -416,10 +432,10 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn for (const WorkerId& entry : avail_workers_) { reply->add_avail_worker(entry); } - + release_all_locks(); } -// pick_objstore assumes that objtable_lock_ has been acquired +// pick_objstore assumes that objects_lock_ has been acquired // pick_objstore must be called with a canonical_objref ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { std::mt19937 rng; @@ -442,27 +458,20 @@ void SchedulerService::perform_pulls() { const std::pair& pull = pull_queue_[i]; ObjRef objref = pull.second; WorkerId workerid = pull.first; + ObjStoreId objstoreid = get_store(workerid); if (!has_canonical_objref(objref)) { RAY_LOG(RAY_ALIAS, "objref " << objref << " does not have a canonical_objref, so continuing"); continue; } ObjRef canonical_objref = get_canonical_objref(objref); RAY_LOG(RAY_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid)); - - objtable_lock_.lock(); - int num_stores = objtable_[canonical_objref].size(); - objtable_lock_.unlock(); - + int num_stores; + { + std::lock_guard objects_lock(objects_lock_); + num_stores = objtable_[canonical_objref].size(); + } if (num_stores > 0) { - { - std::lock_guard objtable_lock(objtable_lock_); - if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), get_store(workerid))) { - // The worker's local object store does not already contain objref, so ship - // it there from an object store that does have it. - ObjStoreId objstoreid = pick_objstore(canonical_objref); - deliver_object(canonical_objref, objstoreid, get_store(workerid)); - } - } + deliver_object_if_necessary(canonical_objref, pick_objstore(canonical_objref), objstoreid); { // Notify the relevant objstore about potential aliasing when it's ready std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); @@ -511,7 +520,7 @@ void SchedulerService::schedule_tasks_location_aware() { for (int i = 0; i < avail_workers_.size(); ++i) { // Submit all tasks whose arguments are ready. WorkerId workerid = avail_workers_[i]; - ObjStoreId objstoreid = workers_[workerid].objstoreid; + ObjStoreId objstoreid = get_store(workerid); auto bestit = task_queue_.end(); // keep track of the task that fits the worker best so far size_t min_num_shipped_objects = std::numeric_limits::max(); // number of objects that need to be transfered for this worker for (auto it = task_queue_.begin(); it != task_queue_.end(); ++it) { @@ -526,9 +535,12 @@ void SchedulerService::schedule_tasks_location_aware() { ObjRef objref = task.arg(j).ref(); RAY_CHECK(has_canonical_objref(objref), "no canonical object ref found even though task is ready; that should not be possible!"); ObjRef canonical_objref = get_canonical_objref(objref); - // check if the object is already in the local object store - if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { - num_shipped_objects += 1; + { + // check if the object is already in the local object store + std::lock_guard objects_lock(objects_lock_); + if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { + num_shipped_objects += 1; + } } } } @@ -602,7 +614,7 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_ return true; } { - std::lock_guard lock(objtable_lock_); + std::lock_guard lock(objects_lock_); if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { // the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias return false; @@ -613,9 +625,10 @@ bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_ NotifyAliasRequest request; request.set_alias_objref(alias_objref); request.set_canonical_objref(canonical_objref); - objstores_lock_.lock(); - objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply); - objstores_lock_.unlock(); + { + std::lock_guard objstores_lock(objstores_lock_); + objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply); + } return true; } @@ -627,7 +640,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) { // DecrementRefCount). RAY_LOG(RAY_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << "."); { - std::lock_guard objtable_lock(objtable_lock_); + std::lock_guard objects_lock(objects_lock_); auto &objstores = objtable_[canonical_objref]; std::lock_guard objstores_lock(objstores_lock_); // TODO(rkn): Should this be inside the for loop instead? for (int i = 0; i < objstores.size(); ++i) { @@ -702,6 +715,41 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector upstream_objrefs(downstream_objref, equivalent_objrefs); } +// This method defines the order in which locks should be acquired. +void SchedulerService::do_on_locks(bool lock) { + std::mutex *mutexes[] = { + &pull_queue_lock_, + &computation_graph_lock_, + &fntable_lock_, + &avail_workers_lock_, + &task_queue_lock_, + &alias_notification_queue_lock_, + &workers_lock_, + &reference_counts_lock_, + &contained_objrefs_lock_, + &objects_lock_, + &objstores_lock_, + &target_objrefs_lock_, + &reverse_target_objrefs_lock_, + }; + size_t n = sizeof(mutexes) / sizeof(*mutexes); + for (size_t i = 0; i != n; ++i) { + if (lock) { + mutexes[i]->lock(); + } else { + mutexes[n - i - 1]->unlock(); + } + } +} + +void SchedulerService::acquire_all_locks() { + do_on_locks(true); +} + +void SchedulerService::release_all_locks() { + do_on_locks(false); +} + void start_scheduler_service(const char* service_addr, SchedulingAlgorithmType scheduling_algorithm) { std::string service_address(service_addr); std::string::iterator split_point = split_ip_address(service_address); diff --git a/src/scheduler.h b/src/scheduler.h index 6c246ed34..4520f4af8 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -69,7 +69,11 @@ public: Status SchedulerInfo(ServerContext* context, const SchedulerInfoRequest* request, SchedulerInfoReply* reply) override; Status TaskInfo(ServerContext* context, const TaskInfoRequest* request, TaskInfoReply* reply) override; - // ask an object store to send object to another objectstore + // This will ask an object store to send an object to another object store if + // the object is not already present in that object store and is not already + // being transmitted. + void deliver_object_if_necessary(ObjRef objref, ObjStoreId from, ObjStoreId to); + // ask an object store to send object to another object store void deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to); // assign a task to a worker void schedule(); @@ -92,7 +96,7 @@ public: // get information about the scheduler state void get_info(const SchedulerInfoRequest& request, SchedulerInfoReply* reply); private: - // pick an objectstore that holds a given object (needs protection by objtable_lock_) + // pick an objectstore that holds a given object (needs protection by objects_lock_) ObjStoreId pick_objstore(ObjRef objref); // checks if objref is a canonical objref bool is_canonical(ObjRef objref); @@ -120,6 +124,12 @@ private: void upstream_objrefs(ObjRef objref, std::vector &objrefs); // Find all of the object references that refer to the same object as objref (as best as we can determine at the moment). The information may be incomplete because not all of the aliases may be known. void get_equivalent_objrefs(ObjRef objref, std::vector &equivalent_objrefs); + // acquires all locks, this should only be used by get_info and for fault tolerance + void acquire_all_locks(); + // release all locks, this should only be used by get_info and for fault tolerance + void release_all_locks(); + // acquire or release all the locks. This is a single method to ensure a single canonical ordering of the locks. + void do_on_locks(bool lock); // The computation graph tracks the operations that have been submitted to the // scheduler and is mostly used for fault tolerance. @@ -148,7 +158,16 @@ private: std::mutex reverse_target_objrefs_lock_; // Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_. ObjTable objtable_; - std::mutex objtable_lock_; + std::mutex objects_lock_; // This lock protects objtable_ and objects_in_transit_ + // For each object store objstoreid, objects_in_transit_[objstoreid] is a + // vector of the canonical object references that are being streamed to that + // object store but are not yet present. Object references are added to this + // in deliver_object_if_necessary (to ensure that we do not attempt to deliver + // the same object to a given object store twice), and object references are + // removed when add_location is called (from ObjReady), and they are moved to + // the objtable_. Note that objects_in_transit_ and objtable_ share the same + // lock (objects_lock_). + std::vector > objects_in_transit_; // Hash map from function names to workers where the function is registered. FnTable fntable_; std::mutex fntable_lock_;