mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
raylet memory corruption fixes (#2591)
* raylet memory corruption fixes * add util function to translate boost error to ray status * tcp client connection now using ray status utility function * lint
This commit is contained in:
parent
6ab01a2cad
commit
df7ee7ff1e
5 changed files with 22 additions and 19 deletions
|
@ -4,6 +4,7 @@
|
|||
|
||||
#include "common.h"
|
||||
#include "ray/raylet/format/node_manager_generated.h"
|
||||
#include "ray/util/util.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -14,11 +15,7 @@ ray::Status TcpConnect(boost::asio::ip::tcp::socket &socket,
|
|||
boost::asio::ip::tcp::endpoint endpoint(ip_address, port);
|
||||
boost::system::error_code error;
|
||||
socket.connect(endpoint, error);
|
||||
if (error) {
|
||||
return ray::Status::IOError(error.message());
|
||||
} else {
|
||||
return ray::Status::OK();
|
||||
}
|
||||
return boost_to_ray_status(error);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
|
@ -83,11 +80,7 @@ ray::Status ServerConnection<T>::WriteMessage(int64_t type, int64_t length,
|
|||
// TODO(swang): Does this need to be an async write?
|
||||
boost::system::error_code error;
|
||||
WriteBuffer(message_buffers, error);
|
||||
if (error) {
|
||||
return ray::Status::IOError(error.message());
|
||||
} else {
|
||||
return ray::Status::OK();
|
||||
}
|
||||
return boost_to_ray_status(error);
|
||||
}
|
||||
|
||||
template <class T>
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#include "ray/object_manager/object_manager.h"
|
||||
#include "ray/util/util.h"
|
||||
|
||||
namespace asio = boost::asio;
|
||||
|
||||
|
@ -332,11 +333,10 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id,
|
|||
buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length));
|
||||
conn->WriteBuffer(buffer, ec);
|
||||
|
||||
ray::Status status = ray::Status::OK();
|
||||
if (ec.value() != 0) {
|
||||
ray::Status status = boost_to_ray_status(ec);
|
||||
if (ec.value() != boost::system::errc::success) {
|
||||
// Push failed. Deal with partial objects on the receiving end.
|
||||
// TODO(hme): Try to invoke disconnect on sender connection, then remove it.
|
||||
status = ray::Status::IOError(ec.message());
|
||||
}
|
||||
|
||||
// Do this regardless of whether it failed or succeeded.
|
||||
|
@ -626,7 +626,7 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id,
|
|||
buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length));
|
||||
boost::system::error_code ec;
|
||||
conn.ReadBuffer(buffer, ec);
|
||||
if (ec.value() == 0) {
|
||||
if (ec.value() == boost::system::errc::success) {
|
||||
buffer_pool_.SealChunk(object_id, chunk_index);
|
||||
} else {
|
||||
buffer_pool_.AbortCreateChunk(object_id, chunk_index);
|
||||
|
@ -643,8 +643,8 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id,
|
|||
buffer.push_back(asio::buffer(mutable_vec, buffer_length));
|
||||
boost::system::error_code ec;
|
||||
conn.ReadBuffer(buffer, ec);
|
||||
if (ec.value() != 0) {
|
||||
RAY_LOG(ERROR) << ec.message();
|
||||
if (ec.value() != boost::system::errc::success) {
|
||||
RAY_LOG(ERROR) << boost_to_ray_status(ec).ToString();
|
||||
}
|
||||
// TODO(hme): If the object isn't local, create a pull request for this chunk.
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include "common/common_protocol.h"
|
||||
|
||||
#include "ray/object_manager/object_store_notification_manager.h"
|
||||
#include "ray/util/util.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -46,8 +47,8 @@ void ObjectStoreNotificationManager::ProcessStoreLength(
|
|||
|
||||
void ObjectStoreNotificationManager::ProcessStoreNotification(
|
||||
const boost::system::error_code &error) {
|
||||
if (error) {
|
||||
RAY_LOG(FATAL) << error.message();
|
||||
if (error.value() != boost::system::errc::success) {
|
||||
RAY_LOG(FATAL) << boost_to_ray_status(error).ToString();
|
||||
}
|
||||
|
||||
const auto &object_info = flatbuffers::GetRoot<ObjectInfo>(notification_.data());
|
||||
|
|
|
@ -160,12 +160,12 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) {
|
|||
// If there are no more needed objects created by this task, stop listening
|
||||
// for notifications.
|
||||
if (it->second.created_objects.empty()) {
|
||||
listening_tasks_.erase(it);
|
||||
// Cancel notifications for the task lease if we were subscribed to them.
|
||||
if (it->second.subscribed) {
|
||||
RAY_CHECK_OK(
|
||||
task_lease_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_));
|
||||
}
|
||||
listening_tasks_.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,4 +26,13 @@ inline int64_t current_sys_time_ms() {
|
|||
return ms_since_epoch.count();
|
||||
}
|
||||
|
||||
inline ray::Status boost_to_ray_status(const boost::system::error_code &error) {
|
||||
switch (error.value()) {
|
||||
case boost::system::errc::success:
|
||||
return ray::Status::OK();
|
||||
default:
|
||||
return ray::Status::IOError(strerror(error.value()));
|
||||
}
|
||||
}
|
||||
|
||||
#endif // RAY_UTIL_UTIL_H
|
||||
|
|
Loading…
Add table
Reference in a new issue