From df7ee7ff1eaba5afbfa8e84c47b5a727a66f5ec7 Mon Sep 17 00:00:00 2001 From: Alexey Tumanov Date: Wed, 8 Aug 2018 19:50:43 -0700 Subject: [PATCH] raylet memory corruption fixes (#2591) * raylet memory corruption fixes * add util function to translate boost error to ray status * tcp client connection now using ray status utility function * lint --- src/ray/common/client_connection.cc | 13 +++---------- src/ray/object_manager/object_manager.cc | 12 ++++++------ .../object_store_notification_manager.cc | 5 +++-- src/ray/raylet/reconstruction_policy.cc | 2 +- src/ray/util/util.h | 9 +++++++++ 5 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/ray/common/client_connection.cc b/src/ray/common/client_connection.cc index fe6f942a8..04daa2067 100644 --- a/src/ray/common/client_connection.cc +++ b/src/ray/common/client_connection.cc @@ -4,6 +4,7 @@ #include "common.h" #include "ray/raylet/format/node_manager_generated.h" +#include "ray/util/util.h" namespace ray { @@ -14,11 +15,7 @@ ray::Status TcpConnect(boost::asio::ip::tcp::socket &socket, boost::asio::ip::tcp::endpoint endpoint(ip_address, port); boost::system::error_code error; socket.connect(endpoint, error); - if (error) { - return ray::Status::IOError(error.message()); - } else { - return ray::Status::OK(); - } + return boost_to_ray_status(error); } template @@ -83,11 +80,7 @@ ray::Status ServerConnection::WriteMessage(int64_t type, int64_t length, // TODO(swang): Does this need to be an async write? boost::system::error_code error; WriteBuffer(message_buffers, error); - if (error) { - return ray::Status::IOError(error.message()); - } else { - return ray::Status::OK(); - } + return boost_to_ray_status(error); } template diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 413d25e46..32099d67b 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -1,4 +1,5 @@ #include "ray/object_manager/object_manager.h" +#include "ray/util/util.h" namespace asio = boost::asio; @@ -332,11 +333,10 @@ ray::Status ObjectManager::SendObjectData(const ObjectID &object_id, buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length)); conn->WriteBuffer(buffer, ec); - ray::Status status = ray::Status::OK(); - if (ec.value() != 0) { + ray::Status status = boost_to_ray_status(ec); + if (ec.value() != boost::system::errc::success) { // Push failed. Deal with partial objects on the receiving end. // TODO(hme): Try to invoke disconnect on sender connection, then remove it. - status = ray::Status::IOError(ec.message()); } // Do this regardless of whether it failed or succeeded. @@ -626,7 +626,7 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, buffer.push_back(asio::buffer(chunk_info.data, chunk_info.buffer_length)); boost::system::error_code ec; conn.ReadBuffer(buffer, ec); - if (ec.value() == 0) { + if (ec.value() == boost::system::errc::success) { buffer_pool_.SealChunk(object_id, chunk_index); } else { buffer_pool_.AbortCreateChunk(object_id, chunk_index); @@ -643,8 +643,8 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, buffer.push_back(asio::buffer(mutable_vec, buffer_length)); boost::system::error_code ec; conn.ReadBuffer(buffer, ec); - if (ec.value() != 0) { - RAY_LOG(ERROR) << ec.message(); + if (ec.value() != boost::system::errc::success) { + RAY_LOG(ERROR) << boost_to_ray_status(ec).ToString(); } // TODO(hme): If the object isn't local, create a pull request for this chunk. } diff --git a/src/ray/object_manager/object_store_notification_manager.cc b/src/ray/object_manager/object_store_notification_manager.cc index 874a222bc..4d7c401ee 100644 --- a/src/ray/object_manager/object_store_notification_manager.cc +++ b/src/ray/object_manager/object_store_notification_manager.cc @@ -9,6 +9,7 @@ #include "common/common_protocol.h" #include "ray/object_manager/object_store_notification_manager.h" +#include "ray/util/util.h" namespace ray { @@ -46,8 +47,8 @@ void ObjectStoreNotificationManager::ProcessStoreLength( void ObjectStoreNotificationManager::ProcessStoreNotification( const boost::system::error_code &error) { - if (error) { - RAY_LOG(FATAL) << error.message(); + if (error.value() != boost::system::errc::success) { + RAY_LOG(FATAL) << boost_to_ray_status(error).ToString(); } const auto &object_info = flatbuffers::GetRoot(notification_.data()); diff --git a/src/ray/raylet/reconstruction_policy.cc b/src/ray/raylet/reconstruction_policy.cc index e0e175b47..89632d546 100644 --- a/src/ray/raylet/reconstruction_policy.cc +++ b/src/ray/raylet/reconstruction_policy.cc @@ -160,12 +160,12 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) { // If there are no more needed objects created by this task, stop listening // for notifications. if (it->second.created_objects.empty()) { - listening_tasks_.erase(it); // Cancel notifications for the task lease if we were subscribed to them. if (it->second.subscribed) { RAY_CHECK_OK( task_lease_pubsub_.CancelNotifications(JobID::nil(), task_id, client_id_)); } + listening_tasks_.erase(it); } } diff --git a/src/ray/util/util.h b/src/ray/util/util.h index 6bf4a165a..7e67210d7 100644 --- a/src/ray/util/util.h +++ b/src/ray/util/util.h @@ -26,4 +26,13 @@ inline int64_t current_sys_time_ms() { return ms_since_epoch.count(); } +inline ray::Status boost_to_ray_status(const boost::system::error_code &error) { + switch (error.value()) { + case boost::system::errc::success: + return ray::Status::OK(); + default: + return ray::Status::IOError(strerror(error.value())); + } +} + #endif // RAY_UTIL_UTIL_H