mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
fix bug in which object is deallocated before objready is called (#120)
This commit is contained in:
parent
a461c7af8b
commit
c7e14e0a8b
3 changed files with 44 additions and 10 deletions
|
@ -75,7 +75,7 @@ Status ObjStoreService::StartDelivery(ServerContext* context, const StartDeliver
|
|||
if (memory_[objref].second == MemoryStatusType::NOT_PRESENT) {
|
||||
}
|
||||
else {
|
||||
RAY_CHECK_NEQ(memory_[objref].second, MemoryStatusType::DEALLOCATED, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED.");
|
||||
RAY_CHECK_NEQ(memory_[objref].second, MemoryStatusType::DEALLOCATED, "Objstore " << objstoreid_ << " is attempting to get objref " << objref << ", but memory_[objref] == DEALLOCATED.");
|
||||
RAY_LOG(RAY_DEBUG, "Objstore " << objstoreid_ << " already has objref " << objref << " or it is already being shipped, so no need to pull it again.");
|
||||
return Status::OK;
|
||||
}
|
||||
|
|
|
@ -100,6 +100,11 @@ Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefs
|
|||
std::lock_guard<std::mutex> reverse_target_objrefs_lock(reverse_target_objrefs_lock_);
|
||||
reverse_target_objrefs_[target_objref].push_back(alias_objref);
|
||||
}
|
||||
{
|
||||
// The corresponding increment was done in register_new_object.
|
||||
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because decrement_ref_count assumes it has been acquired
|
||||
decrement_ref_count(std::vector<ObjRef>({alias_objref}));
|
||||
}
|
||||
schedule();
|
||||
return Status::OK;
|
||||
}
|
||||
|
@ -141,6 +146,14 @@ Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest*
|
|||
RAY_LOG(RAY_DEBUG, "object " << objref << " ready on store " << request->objstoreid());
|
||||
add_canonical_objref(objref);
|
||||
add_location(objref, request->objstoreid());
|
||||
{
|
||||
// If this is the first time that ObjReady has been called for this objref,
|
||||
// the corresponding increment was done in register_new_object in the
|
||||
// scheduler. For all subsequent calls to ObjReady, the corresponding
|
||||
// increment was done in deliver_object_if_necessary in the scheduler.
|
||||
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because decrement_ref_count assumes it has been acquired
|
||||
decrement_ref_count(std::vector<ObjRef>({objref}));
|
||||
}
|
||||
schedule();
|
||||
return Status::OK;
|
||||
}
|
||||
|
@ -250,6 +263,13 @@ void SchedulerService::deliver_object_if_necessary(ObjRef canonical_objref, ObjS
|
|||
void SchedulerService::deliver_object(ObjRef canonical_objref, ObjStoreId from, ObjStoreId to) {
|
||||
RAY_CHECK_NEQ(from, to, "attempting to deliver canonical_objref " << canonical_objref << " from objstore " << from << " to itself.");
|
||||
RAY_CHECK(is_canonical(canonical_objref), "attempting to deliver objref " << canonical_objref << ", but this objref is not a canonical objref.");
|
||||
{
|
||||
// We increment once so the objref doesn't go out of scope before the ObjReady
|
||||
// method is called. The corresponding decrement will happen in ObjReady in
|
||||
// the scheduler.
|
||||
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_); // we grab this lock because increment_ref_count assumes it has been acquired
|
||||
increment_ref_count(std::vector<ObjRef>({canonical_objref}));
|
||||
}
|
||||
ClientContext context;
|
||||
AckReply reply;
|
||||
StartDeliveryRequest request;
|
||||
|
@ -373,11 +393,21 @@ ObjRef SchedulerService::register_new_object() {
|
|||
reverse_target_objrefs_.push_back(std::vector<ObjRef>());
|
||||
reference_counts_.push_back(0);
|
||||
contained_objrefs_.push_back(std::vector<ObjRef>());
|
||||
{
|
||||
// We increment once so the objref doesn't go out of scope before the ObjReady
|
||||
// method is called. The corresponding decrement will happen either in
|
||||
// ObjReady in the scheduler or in AliasObjRefs in the scheduler.
|
||||
increment_ref_count(std::vector<ObjRef>({objtable_size})); // Note that reference_counts_lock_ is acquired above, as assumed by increment_ref_count
|
||||
}
|
||||
return objtable_size;
|
||||
}
|
||||
|
||||
void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) {
|
||||
// add_location must be called with a canonical objref
|
||||
{
|
||||
std::lock_guard<std::mutex> reference_counts_lock(reference_counts_lock_);
|
||||
RAY_CHECK_NEQ(reference_counts_[canonical_objref], DEALLOCATED, "Calling ObjReady with canonical_objref " << canonical_objref << ", but this objref has already been deallocated");
|
||||
}
|
||||
RAY_CHECK(is_canonical(canonical_objref), "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")");
|
||||
std::lock_guard<std::mutex> objects_lock(objects_lock_);
|
||||
RAY_CHECK_LT(canonical_objref, objtable_.size(), "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")");
|
||||
|
@ -657,7 +687,7 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) {
|
|||
decrement_ref_count(contained_objrefs_[canonical_objref]);
|
||||
}
|
||||
|
||||
void SchedulerService::increment_ref_count(std::vector<ObjRef> &objrefs) {
|
||||
void SchedulerService::increment_ref_count(const std::vector<ObjRef> &objrefs) {
|
||||
// increment_ref_count assumes that reference_counts_lock_ has been acquired already
|
||||
for (int i = 0; i < objrefs.size(); ++i) {
|
||||
ObjRef objref = objrefs[i];
|
||||
|
@ -667,7 +697,7 @@ void SchedulerService::increment_ref_count(std::vector<ObjRef> &objrefs) {
|
|||
}
|
||||
}
|
||||
|
||||
void SchedulerService::decrement_ref_count(std::vector<ObjRef> &objrefs) {
|
||||
void SchedulerService::decrement_ref_count(const std::vector<ObjRef> &objrefs) {
|
||||
// decrement_ref_count assumes that reference_counts_lock_ has been acquired already
|
||||
for (int i = 0; i < objrefs.size(); ++i) {
|
||||
ObjRef objref = objrefs[i];
|
||||
|
@ -718,19 +748,20 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef>
|
|||
// This method defines the order in which locks should be acquired.
|
||||
void SchedulerService::do_on_locks(bool lock) {
|
||||
std::mutex *mutexes[] = {
|
||||
&failed_tasks_lock_,
|
||||
&pull_queue_lock_,
|
||||
&computation_graph_lock_,
|
||||
&fntable_lock_,
|
||||
&avail_workers_lock_,
|
||||
&task_queue_lock_,
|
||||
&alias_notification_queue_lock_,
|
||||
&workers_lock_,
|
||||
&reference_counts_lock_,
|
||||
&contained_objrefs_lock_,
|
||||
&workers_lock_,
|
||||
&alias_notification_queue_lock_,
|
||||
&objects_lock_,
|
||||
&objstores_lock_,
|
||||
&target_objrefs_lock_,
|
||||
&reverse_target_objrefs_lock_,
|
||||
&reverse_target_objrefs_lock_
|
||||
};
|
||||
size_t n = sizeof(mutexes) / sizeof(*mutexes);
|
||||
for (size_t i = 0; i != n; ++i) {
|
||||
|
|
|
@ -117,9 +117,9 @@ private:
|
|||
// tell all of the objstores holding canonical_objref to deallocate it
|
||||
void deallocate_object(ObjRef canonical_objref);
|
||||
// increment the ref counts for the object references in objrefs
|
||||
void increment_ref_count(std::vector<ObjRef> &objrefs);
|
||||
void increment_ref_count(const std::vector<ObjRef> &objrefs);
|
||||
// decrement the ref counts for the object references in objrefs
|
||||
void decrement_ref_count(std::vector<ObjRef> &objrefs);
|
||||
void decrement_ref_count(const std::vector<ObjRef> &objrefs);
|
||||
// Find all of the object references which are upstream of objref (including objref itself). That is, you can get from everything in objrefs to objref by repeatedly indexing in target_objrefs_.
|
||||
void upstream_objrefs(ObjRef objref, std::vector<ObjRef> &objrefs);
|
||||
// Find all of the object references that refer to the same object as objref (as best as we can determine at the moment). The information may be incomplete because not all of the aliases may be known.
|
||||
|
@ -183,8 +183,11 @@ private:
|
|||
// List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)).
|
||||
std::vector<std::pair<ObjStoreId, std::pair<ObjRef, ObjRef> > > alias_notification_queue_;
|
||||
std::mutex alias_notification_queue_lock_;
|
||||
// Reference counts. Currently, reference_counts_[objref] is the number of existing references
|
||||
// held to objref. This is done for all objrefs, not just canonical_objrefs. This data structure completely ignores aliasing.
|
||||
// Reference counts. Currently, reference_counts_[objref] is the number of
|
||||
// existing references held to objref. This is done for all objrefs, not just
|
||||
// canonical_objrefs. This data structure completely ignores aliasing. If the
|
||||
// object corresponding to objref has been deallocated, then
|
||||
// reference_counts[objref] will equal DEALLOCATED.
|
||||
std::vector<RefCount> reference_counts_;
|
||||
std::mutex reference_counts_lock_;
|
||||
// contained_objrefs_[objref] is a vector of all of the objrefs contained inside the object referred to by objref
|
||||
|
|
Loading…
Add table
Reference in a new issue