mirror of
https://github.com/vale981/ray
synced 2025-03-07 02:51:39 -05:00
[Core] Fix plasma store segfault (#15071)
* Use shared pointer instead of a raw pointer * Lint. * Addressed code review. * Addressed code review.g
This commit is contained in:
parent
d8f8583e80
commit
015369db34
2 changed files with 35 additions and 11 deletions
|
@ -89,17 +89,33 @@ struct GetRequest {
|
|||
|
||||
void AsyncWait(int64_t timeout_ms,
|
||||
std::function<void(const boost::system::error_code &)> on_timeout) {
|
||||
RAY_CHECK(!is_removed_);
|
||||
// Set an expiry time relative to now.
|
||||
timer_.expires_from_now(std::chrono::milliseconds(timeout_ms));
|
||||
timer_.async_wait(on_timeout);
|
||||
}
|
||||
|
||||
void CancelTimer() { timer_.cancel(); }
|
||||
void CancelTimer() {
|
||||
RAY_CHECK(!is_removed_);
|
||||
timer_.cancel();
|
||||
}
|
||||
|
||||
/// Mark that the get request is removed.
|
||||
void MarkRemoved() {
|
||||
RAY_CHECK(!is_removed_);
|
||||
is_removed_ = true;
|
||||
}
|
||||
|
||||
bool IsRemoved() const { return is_removed_; }
|
||||
|
||||
private:
|
||||
/// The timer that will time out and cause this wait to return to
|
||||
/// the client if it hasn't already returned.
|
||||
boost::asio::steady_timer timer_;
|
||||
/// Whether or not if this get request is removed.
|
||||
/// Once the get request is removed, any operation on top of the get request shouldn't
|
||||
/// happen.
|
||||
bool is_removed_ = false;
|
||||
};
|
||||
|
||||
GetRequest::GetRequest(instrumented_io_context &io_context,
|
||||
|
@ -345,7 +361,7 @@ void PlasmaObject_init(PlasmaObject *object, ObjectTableEntry *entry) {
|
|||
object->device_num = entry->device_num;
|
||||
}
|
||||
|
||||
void PlasmaStore::RemoveGetRequest(GetRequest *get_request) {
|
||||
void PlasmaStore::RemoveGetRequest(const std::shared_ptr<GetRequest> &get_request) {
|
||||
// Remove the get request from each of the relevant object_get_requests hash
|
||||
// tables if it is present there. It should only be present there if the get
|
||||
// request timed out or if it was issued by a client that has disconnected.
|
||||
|
@ -366,13 +382,13 @@ void PlasmaStore::RemoveGetRequest(GetRequest *get_request) {
|
|||
}
|
||||
// Remove the get request.
|
||||
get_request->CancelTimer();
|
||||
delete get_request;
|
||||
get_request->MarkRemoved();
|
||||
}
|
||||
|
||||
void PlasmaStore::RemoveGetRequestsForClient(const std::shared_ptr<Client> &client) {
|
||||
std::unordered_set<GetRequest *> get_requests_to_remove;
|
||||
std::unordered_set<std::shared_ptr<GetRequest>> get_requests_to_remove;
|
||||
for (auto const &pair : object_get_requests_) {
|
||||
for (GetRequest *get_request : pair.second) {
|
||||
for (const auto &get_request : pair.second) {
|
||||
if (get_request->client == client) {
|
||||
get_requests_to_remove.insert(get_request);
|
||||
}
|
||||
|
@ -382,12 +398,18 @@ void PlasmaStore::RemoveGetRequestsForClient(const std::shared_ptr<Client> &clie
|
|||
// It shouldn't be possible for a given client to be in the middle of multiple get
|
||||
// requests.
|
||||
RAY_CHECK(get_requests_to_remove.size() <= 1);
|
||||
for (GetRequest *get_request : get_requests_to_remove) {
|
||||
for (const auto &get_request : get_requests_to_remove) {
|
||||
RemoveGetRequest(get_request);
|
||||
}
|
||||
}
|
||||
|
||||
void PlasmaStore::ReturnFromGet(GetRequest *get_req) {
|
||||
void PlasmaStore::ReturnFromGet(const std::shared_ptr<GetRequest> &get_req) {
|
||||
// If the get request is already removed, do no-op. This can happen because the boost
|
||||
// timer is not atomic. See https://github.com/ray-project/ray/pull/15071.
|
||||
if (get_req->IsRemoved()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Figure out how many file descriptors we need to send.
|
||||
std::unordered_set<MEMFD_TYPE> fds_to_send;
|
||||
std::vector<MEMFD_TYPE> store_fds;
|
||||
|
@ -476,7 +498,8 @@ void PlasmaStore::ProcessGetRequest(const std::shared_ptr<Client> &client,
|
|||
const std::vector<ObjectID> &object_ids,
|
||||
int64_t timeout_ms, bool is_from_worker) {
|
||||
// Create a get request for this object.
|
||||
auto get_req = new GetRequest(io_context_, client, object_ids, is_from_worker);
|
||||
auto get_req = std::make_shared<GetRequest>(
|
||||
GetRequest(io_context_, client, object_ids, is_from_worker));
|
||||
for (auto object_id : object_ids) {
|
||||
// Check if this object is already present
|
||||
// locally. If so, record that the object is being used and mark it as accounted for.
|
||||
|
|
|
@ -238,14 +238,14 @@ class PlasmaStore {
|
|||
/// Remove a GetRequest and clean up the relevant data structures.
|
||||
///
|
||||
/// \param get_request The GetRequest to remove.
|
||||
void RemoveGetRequest(GetRequest *get_request);
|
||||
void RemoveGetRequest(const std::shared_ptr<GetRequest> &get_request);
|
||||
|
||||
/// Remove all of the GetRequests for a given client.
|
||||
///
|
||||
/// \param client The client whose GetRequests should be removed.
|
||||
void RemoveGetRequestsForClient(const std::shared_ptr<Client> &client);
|
||||
|
||||
void ReturnFromGet(GetRequest *get_req);
|
||||
void ReturnFromGet(const std::shared_ptr<GetRequest> &get_req);
|
||||
|
||||
void UpdateObjectGetRequests(const ObjectID &object_id);
|
||||
|
||||
|
@ -277,7 +277,8 @@ class PlasmaStore {
|
|||
QuotaAwarePolicy eviction_policy_;
|
||||
/// A hash table mapping object IDs to a vector of the get requests that are
|
||||
/// waiting for the object to arrive.
|
||||
std::unordered_map<ObjectID, std::vector<GetRequest *>> object_get_requests_;
|
||||
std::unordered_map<ObjectID, std::vector<std::shared_ptr<GetRequest>>>
|
||||
object_get_requests_;
|
||||
|
||||
std::unordered_set<ObjectID> deletion_cache_;
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue