Enable timeline visualizations of object transfers. (#3255)

* Plot object transfers.

* Linting
This commit is contained in:
Robert Nishihara 2018-11-07 12:45:59 -08:00 committed by Philipp Moritz
parent 4182b85611
commit 1dd5d92789
9 changed files with 431 additions and 67 deletions

View file

@ -584,46 +584,14 @@ class GlobalState(object):
for component_id in component_identifiers_binary for component_id in component_identifiers_binary
} }
def chrome_tracing_dump(self, def _seconds_to_microseconds(self, time_in_seconds):
include_task_data=False, """A helper function for converting seconds to microseconds."""
filename=None, time_in_microseconds = 10**6 * time_in_seconds
open_browser=False): return time_in_microseconds
"""Return a list of profiling events that can viewed as a timeline.
To view this information as a timeline, simply dump it as a json file
using json.dumps, and then load go to chrome://tracing in the Chrome
web browser and load the dumped file. Make sure to enable "Flow events"
in the "View Options" menu.
Args:
include_task_data: If true, we will include more task metadata such
as the task specifications in the json.
filename: If a filename is provided, the timeline is dumped to that
file.
open_browser: If true, we will attempt to automatically open the
timeline visualization in Chrome.
Returns:
If filename is not provided, this returns a list of profiling
events. Each profile event is a dictionary.
"""
# TODO(rkn): Support including the task specification data in the
# timeline.
# TODO(rkn): This should support viewing just a window of time or a
# limited number of events.
if include_task_data:
raise NotImplementedError("This flag has not been implented yet.")
if open_browser:
raise NotImplementedError("This flag has not been implented yet.")
profile_table = self.profile_table()
all_events = []
# Colors are specified at # Colors are specified at
# https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501 # https://github.com/catapult-project/catapult/blob/master/tracing/tracing/base/color_scheme.html. # noqa: E501
default_color_mapping = defaultdict( _default_color_mapping = defaultdict(
lambda: "generic_work", { lambda: "generic_work", {
"worker_idle": "cq_build_abandoned", "worker_idle": "cq_build_abandoned",
"task": "rail_response", "task": "rail_response",
@ -639,11 +607,74 @@ class GlobalState(object):
"register_remote_function": "detailed_memory_dump", "register_remote_function": "detailed_memory_dump",
}) })
def seconds_to_microseconds(time_in_seconds): # These colors are for use in Chrome tracing.
time_in_microseconds = 10**6 * time_in_seconds _chrome_tracing_colors = [
return time_in_microseconds "thread_state_uninterruptible",
"thread_state_iowait",
"thread_state_running",
"thread_state_runnable",
"thread_state_sleeping",
"thread_state_unknown",
"background_memory_dump",
"light_memory_dump",
"detailed_memory_dump",
"vsync_highlight_color",
"generic_work",
"good",
"bad",
"terrible",
# "black",
# "grey",
# "white",
"yellow",
"olive",
"rail_response",
"rail_animation",
"rail_idle",
"rail_load",
"startup",
"heap_dump_stack_frame",
"heap_dump_object_type",
"heap_dump_child_node_arrow",
"cq_build_running",
"cq_build_passed",
"cq_build_failed",
"cq_build_abandoned",
"cq_build_attempt_runnig",
"cq_build_attempt_passed",
"cq_build_attempt_failed",
]
def chrome_tracing_dump(self, filename=None):
"""Return a list of profiling events that can viewed as a timeline.
To view this information as a timeline, simply dump it as a json file
by passing in "filename" or using using json.dump, and then load go to
chrome://tracing in the Chrome web browser and load the dumped file.
Make sure to enable "Flow events" in the "View Options" menu.
Args:
filename: If a filename is provided, the timeline is dumped to that
file.
Returns:
If filename is not provided, this returns a list of profiling
events. Each profile event is a dictionary.
"""
# TODO(rkn): Support including the task specification data in the
# timeline.
# TODO(rkn): This should support viewing just a window of time or a
# limited number of events.
profile_table = self.profile_table()
all_events = []
for component_id_hex, component_events in profile_table.items(): for component_id_hex, component_events in profile_table.items():
# Only consider workers and drivers.
component_type = component_events[0]["component_type"]
if component_type not in ["worker", "driver"]:
continue
for event in component_events: for event in component_events:
new_event = { new_event = {
# The category of the event. # The category of the event.
@ -657,14 +688,14 @@ class GlobalState(object):
"tid": event["component_type"] + ":" + "tid": event["component_type"] + ":" +
event["component_id"], event["component_id"],
# The start time in microseconds. # The start time in microseconds.
"ts": seconds_to_microseconds(event["start_time"]), "ts": self._seconds_to_microseconds(event["start_time"]),
# The duration in microseconds. # The duration in microseconds.
"dur": seconds_to_microseconds(event["end_time"] - "dur": self._seconds_to_microseconds(event["end_time"] -
event["start_time"]), event["start_time"]),
# What is this? # What is this?
"ph": "X", "ph": "X",
# This is the name of the color to display the box in. # This is the name of the color to display the box in.
"cname": default_color_mapping[event["event_type"]], "cname": self._default_color_mapping[event["event_type"]],
# The extra user-defined data. # The extra user-defined data.
"args": event["extra_data"], "args": event["extra_data"],
} }
@ -684,6 +715,97 @@ class GlobalState(object):
else: else:
return all_events return all_events
def chrome_tracing_object_transfer_dump(self, filename=None):
"""Return a list of transfer events that can viewed as a timeline.
To view this information as a timeline, simply dump it as a json file
by passing in "filename" or using using json.dump, and then load go to
chrome://tracing in the Chrome web browser and load the dumped file.
Make sure to enable "Flow events" in the "View Options" menu.
Args:
filename: If a filename is provided, the timeline is dumped to that
file.
Returns:
If filename is not provided, this returns a list of profiling
events. Each profile event is a dictionary.
"""
client_id_to_address = {}
for client_info in ray.global_state.client_table():
client_id_to_address[client_info["ClientID"]] = "{}:{}".format(
client_info["NodeManagerAddress"],
client_info["ObjectManagerPort"])
all_events = []
for key, items in self.profile_table().items():
# Only consider object manager events.
if items[0]["component_type"] != "object_manager":
continue
for event in items:
if event["event_type"] == "transfer_send":
object_id, remote_client_id, _, _ = event["extra_data"]
elif event["event_type"] == "transfer_receive":
object_id, remote_client_id, _, _ = event["extra_data"]
elif event["event_type"] == "receive_pull_request":
object_id, remote_client_id = event["extra_data"]
else:
assert False, "This should be unreachable."
# Choose a color by reading the first couple of hex digits of
# the object ID as an integer and turning that into a color.
object_id_int = int(object_id[:2], 16)
color = self._chrome_tracing_colors[object_id_int % len(
self._chrome_tracing_colors)]
new_event = {
# The category of the event.
"cat": event["event_type"],
# The string displayed on the event.
"name": event["event_type"],
# The identifier for the group of rows that the event
# appears in.
"pid": client_id_to_address[key],
# The identifier for the row that the event appears in.
"tid": client_id_to_address[remote_client_id],
# The start time in microseconds.
"ts": self._seconds_to_microseconds(event["start_time"]),
# The duration in microseconds.
"dur": self._seconds_to_microseconds(event["end_time"] -
event["start_time"]),
# What is this?
"ph": "X",
# This is the name of the color to display the box in.
"cname": color,
# The extra user-defined data.
"args": event["extra_data"],
}
all_events.append(new_event)
# Add another box with a color indicating whether it was a send
# or a receive event.
if event["event_type"] == "transfer_send":
additional_event = new_event.copy()
additional_event["cname"] = "black"
all_events.append(additional_event)
elif event["event_type"] == "transfer_receive":
additional_event = new_event.copy()
additional_event["cname"] = "grey"
all_events.append(additional_event)
else:
pass
if filename is not None:
with open(filename, "w") as outfile:
json.dump(all_events, outfile)
else:
return all_events
def dump_catapult_trace(self, def dump_catapult_trace(self,
path, path,
task_info, task_info,

View file

@ -233,12 +233,12 @@ void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error)
read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient); read_type_ = static_cast<int64_t>(protocol::MessageType::DisconnectClient);
} }
uint64_t start_ms = current_time_ms(); int64_t start_ms = current_time_ms();
message_handler_(shared_ClientConnection_from_this(), read_type_, read_message_.data()); message_handler_(shared_ClientConnection_from_this(), read_type_, read_message_.data());
uint64_t interval = current_time_ms() - start_ms; int64_t interval = current_time_ms() - start_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) { if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_ RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_
<< " took " << interval << " ms "; << " took " << interval << " ms.";
} }
} }

View file

@ -283,6 +283,9 @@ void ObjectManager::PullEstablishConnection(const ObjectID &object_id,
void ObjectManager::PullSendRequest(const ObjectID &object_id, void ObjectManager::PullSendRequest(const ObjectID &object_id,
std::shared_ptr<SenderConnection> &conn) { std::shared_ptr<SenderConnection> &conn) {
// TODO(rkn): This would be a natural place to record a profile event
// indicating that a pull request was sent.
flatbuffers::FlatBufferBuilder fbb; flatbuffers::FlatBufferBuilder fbb;
auto message = object_manager_protocol::CreatePullRequestMessage( auto message = object_manager_protocol::CreatePullRequestMessage(
fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary())); fbb, fbb.CreateString(client_id_.binary()), fbb.CreateString(object_id.binary()));
@ -311,6 +314,46 @@ void ObjectManager::HandlePushTaskTimeout(const ObjectID &object_id,
} }
} }
void ObjectManager::HandleSendFinished(const ObjectID &object_id,
const ClientID &client_id, uint64_t chunk_index,
double start_time, double end_time,
ray::Status status) {
if (!status.ok()) {
// TODO(rkn): What do we want to do if the send failed?
}
ProfileEventT profile_event;
profile_event.event_type = "transfer_send";
profile_event.start_time = start_time;
profile_event.end_time = end_time;
// Encode the object ID, client ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," +
std::to_string(chunk_index) + ",\"" + status.ToString() +
"\"]";
profile_events_.push_back(profile_event);
}
void ObjectManager::HandleReceiveFinished(const ObjectID &object_id,
const ClientID &client_id, uint64_t chunk_index,
double start_time, double end_time,
ray::Status status) {
if (!status.ok()) {
// TODO(rkn): What do we want to do if the send failed?
}
ProfileEventT profile_event;
profile_event.event_type = "transfer_receive";
profile_event.start_time = start_time;
profile_event.end_time = end_time;
// Encode the object ID, client ID, chunk index, and status as a json list,
// which will be parsed by the reader of the profile table.
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"," +
std::to_string(chunk_index) + ",\"" + status.ToString() +
"\"]";
profile_events_.push_back(profile_event);
}
void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) { void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
if (local_objects_.count(object_id) == 0) { if (local_objects_.count(object_id) == 0) {
// Avoid setting duplicated timer for the same object and client pair. // Avoid setting duplicated timer for the same object and client pair.
@ -355,12 +398,21 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) { for (uint64_t chunk_index = 0; chunk_index < num_chunks; ++chunk_index) {
send_service_.post([this, client_id, object_id, data_size, metadata_size, send_service_.post([this, client_id, object_id, data_size, metadata_size,
chunk_index, connection_info]() { chunk_index, connection_info]() {
double start_time = current_sys_time_seconds();
// NOTE: When this callback executes, it's possible that the object // NOTE: When this callback executes, it's possible that the object
// will have already been evicted. It's also possible that the // will have already been evicted. It's also possible that the
// object could be in the process of being transferred to this // object could be in the process of being transferred to this
// object manager from another object manager. // object manager from another object manager.
ExecuteSendObject(client_id, object_id, data_size, metadata_size, chunk_index, ray::Status status = ExecuteSendObject(
connection_info); client_id, object_id, data_size, metadata_size, chunk_index, connection_info);
// Notify the main thread that we have finished sending the chunk.
main_service_->post(
[this, object_id, client_id, chunk_index, start_time, status]() {
double end_time = current_sys_time_seconds();
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
}); });
} }
} else { } else {
@ -370,8 +422,8 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
} }
} }
void ObjectManager::ExecuteSendObject(const ClientID &client_id, ray::Status ObjectManager::ExecuteSendObject(
const ObjectID &object_id, uint64_t data_size, const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index, uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &connection_info) { const RemoteConnectionInfo &connection_info) {
RAY_LOG(DEBUG) << "ExecuteSendObject " << client_id << " " << object_id << " " RAY_LOG(DEBUG) << "ExecuteSendObject " << client_id << " " << object_id << " "
@ -390,6 +442,7 @@ void ObjectManager::ExecuteSendObject(const ClientID &client_id,
CheckIOError(status, "Push"); CheckIOError(status, "Push");
} }
} }
return status;
} }
ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id, ray::Status ObjectManager::SendObjectHeaders(const ObjectID &object_id,
@ -709,6 +762,14 @@ void ObjectManager::ReceivePullRequest(std::shared_ptr<TcpClientConnection> &con
auto pr = flatbuffers::GetRoot<object_manager_protocol::PullRequestMessage>(message); auto pr = flatbuffers::GetRoot<object_manager_protocol::PullRequestMessage>(message);
ObjectID object_id = ObjectID::from_binary(pr->object_id()->str()); ObjectID object_id = ObjectID::from_binary(pr->object_id()->str());
ClientID client_id = ClientID::from_binary(pr->client_id()->str()); ClientID client_id = ClientID::from_binary(pr->client_id()->str());
ProfileEventT profile_event;
profile_event.event_type = "receive_pull_request";
profile_event.start_time = current_sys_time_seconds();
profile_event.end_time = profile_event.start_time;
profile_event.extra_data = "[\"" + object_id.hex() + "\",\"" + client_id.hex() + "\"]";
profile_events_.push_back(profile_event);
Push(object_id, client_id); Push(object_id, client_id);
conn->ProcessMessages(); conn->ProcessMessages();
} }
@ -718,20 +779,28 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> &con
// Serialize. // Serialize.
auto object_header = auto object_header =
flatbuffers::GetRoot<object_manager_protocol::PushRequestMessage>(message); flatbuffers::GetRoot<object_manager_protocol::PushRequestMessage>(message);
ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str()); const ObjectID object_id = ObjectID::from_binary(object_header->object_id()->str());
uint64_t chunk_index = object_header->chunk_index(); uint64_t chunk_index = object_header->chunk_index();
uint64_t data_size = object_header->data_size(); uint64_t data_size = object_header->data_size();
uint64_t metadata_size = object_header->metadata_size(); uint64_t metadata_size = object_header->metadata_size();
receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() { receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() {
ExecuteReceiveObject(conn->GetClientID(), object_id, data_size, metadata_size, double start_time = current_sys_time_seconds();
const ClientID client_id = conn->GetClientID();
auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size,
chunk_index, *conn); chunk_index, *conn);
// Notify the main thread that we have finished receiving the object.
main_service_->post([this, object_id, client_id, chunk_index, start_time, status]() {
double end_time = current_sys_time_seconds();
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
}); });
} }
void ObjectManager::ExecuteReceiveObject(const ClientID &client_id, ray::Status ObjectManager::ExecuteReceiveObject(
const ObjectID &object_id, uint64_t data_size, const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index, uint64_t metadata_size, uint64_t chunk_index, TcpClientConnection &conn) {
TcpClientConnection &conn) {
RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " " RAY_LOG(DEBUG) << "ExecuteReceiveObject " << client_id << " " << object_id << " "
<< chunk_index; << chunk_index;
@ -769,6 +838,8 @@ void ObjectManager::ExecuteReceiveObject(const ClientID &client_id,
conn.ProcessMessages(); conn.ProcessMessages();
RAY_LOG(DEBUG) << "ReceiveCompleted " << client_id_ << " " << object_id << " " RAY_LOG(DEBUG) << "ReceiveCompleted " << client_id_ << " " << object_id << " "
<< "/" << config_.max_receives; << "/" << config_.max_receives;
return chunk_status.second;
} }
void ObjectManager::ReceiveFreeRequest(std::shared_ptr<TcpClientConnection> &conn, void ObjectManager::ReceiveFreeRequest(std::shared_ptr<TcpClientConnection> &conn,
@ -820,4 +891,18 @@ void ObjectManager::SpreadFreeObjectRequest(const std::vector<ObjectID> &object_
} }
} }
ProfileTableDataT ObjectManager::GetAndResetProfilingInfo() {
ProfileTableDataT profile_info;
profile_info.component_type = "object_manager";
profile_info.component_id = client_id_.binary();
for (auto const &profile_event : profile_events_) {
profile_info.profile_events.emplace_back(new ProfileEventT(profile_event));
}
profile_events_.clear();
return profile_info;
}
} // namespace ray } // namespace ray

View file

@ -175,6 +175,12 @@ class ObjectManager : public ObjectManagerInterface {
/// or send it to all the object stores. /// or send it to all the object stores.
void FreeObjects(const std::vector<ObjectID> &object_ids, bool local_only); void FreeObjects(const std::vector<ObjectID> &object_ids, bool local_only);
/// Return profiling information and reset the profiling information.
///
/// \return All profiling information that has accumulated since the last call
/// to this method.
ProfileTableDataT GetAndResetProfilingInfo();
private: private:
friend class TestObjectManager; friend class TestObjectManager;
@ -252,17 +258,55 @@ class ObjectManager : public ObjectManagerInterface {
/// Asynchronously send a pull request via remote object manager connection. /// Asynchronously send a pull request via remote object manager connection.
/// Executes on main_service_ thread. /// Executes on main_service_ thread.
///
/// \param object_id The ID of the object request.
/// \param conn The connection to the remote object manager.
/// \return Void.
void PullSendRequest(const ObjectID &object_id, void PullSendRequest(const ObjectID &object_id,
std::shared_ptr<SenderConnection> &conn); std::shared_ptr<SenderConnection> &conn);
std::shared_ptr<SenderConnection> CreateSenderConnection( std::shared_ptr<SenderConnection> CreateSenderConnection(
ConnectionPool::ConnectionType type, RemoteConnectionInfo info); ConnectionPool::ConnectionType type, RemoteConnectionInfo info);
/// This is used to notify the main thread that the sending of a chunk has
/// completed.
///
/// \param object_id The ID of the object that was sent.
/// \param client_id The ID of the client that the chunk was sent to.
/// \param chunk_index The index of the chunk.
/// \param start_time_us The time when the object manager began sending the
/// chunk.
/// \param end_time_us The time when the object manager finished sending the
/// chunk.
/// \param status The status of the send (e.g., did it succeed or fail).
/// \return Void.
void HandleSendFinished(const ObjectID &object_id, const ClientID &client_id,
uint64_t chunk_index, double start_time_us, double end_time_us,
ray::Status status);
/// This is used to notify the main thread that the receiving of a chunk has
/// completed.
///
/// \param object_id The ID of the object that was received.
/// \param client_id The ID of the client that the chunk was received from.
/// \param chunk_index The index of the chunk.
/// \param start_time_us The time when the object manager began receiving the
/// chunk.
/// \param end_time_us The time when the object manager finished receiving the
/// chunk.
/// \param status The status of the receive (e.g., did it succeed or fail).
/// \return Void.
void HandleReceiveFinished(const ObjectID &object_id, const ClientID &client_id,
uint64_t chunk_index, double start_time_us,
double end_time_us, ray::Status status);
/// Begin executing a send. /// Begin executing a send.
/// Executes on send_service_ thread pool. /// Executes on send_service_ thread pool.
void ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id, ray::Status ExecuteSendObject(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index, uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
const RemoteConnectionInfo &connection_info); const RemoteConnectionInfo &connection_info);
/// This method synchronously sends the object id and object size /// This method synchronously sends the object id and object size
/// to the remote object manager. /// to the remote object manager.
/// Executes on send_service_ thread pool. /// Executes on send_service_ thread pool.
@ -280,8 +324,9 @@ class ObjectManager : public ObjectManagerInterface {
/// This will invoke the object receive on the receive_service_ thread pool. /// This will invoke the object receive on the receive_service_ thread pool.
void ReceivePushRequest(std::shared_ptr<TcpClientConnection> &conn, void ReceivePushRequest(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message); const uint8_t *message);
/// Execute a receive on the receive_service_ thread pool. /// Execute a receive on the receive_service_ thread pool.
void ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id, ray::Status ExecuteReceiveObject(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size, uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, TcpClientConnection &conn); uint64_t chunk_index, TcpClientConnection &conn);
@ -351,6 +396,10 @@ class ObjectManager : public ObjectManagerInterface {
unfulfilled_push_requests_; unfulfilled_push_requests_;
std::unordered_map<ObjectID, PullRequest> pull_requests_; std::unordered_map<ObjectID, PullRequest> pull_requests_;
/// Profiling events that are to be batched together and added to the profile
/// table in the GCS.
std::vector<ProfileEventT> profile_events_;
}; };
} // namespace ray } // namespace ray

View file

@ -12,7 +12,7 @@ class RayConfig {
int64_t ray_protocol_version() const { return ray_protocol_version_; } int64_t ray_protocol_version() const { return ray_protocol_version_; }
uint64_t handler_warning_timeout_ms() const { return handler_warning_timeout_ms_; } int64_t handler_warning_timeout_ms() const { return handler_warning_timeout_ms_; }
int64_t heartbeat_timeout_milliseconds() const { int64_t heartbeat_timeout_milliseconds() const {
return heartbeat_timeout_milliseconds_; return heartbeat_timeout_milliseconds_;
@ -147,7 +147,7 @@ class RayConfig {
/// The duration that a single handler on the event loop can take before a /// The duration that a single handler on the event loop can take before a
/// warning is logged that the handler is taking too long. /// warning is logged that the handler is taking too long.
uint64_t handler_warning_timeout_ms_; int64_t handler_warning_timeout_ms_;
/// The duration between heartbeats. These are sent by the plasma manager and /// The duration between heartbeats. These are sent by the plasma manager and
/// local scheduler. /// local scheduler.

View file

@ -50,6 +50,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
gcs_client_(gcs_client), gcs_client_(gcs_client),
heartbeat_timer_(io_service), heartbeat_timer_(io_service),
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)), heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
object_manager_profile_timer_(io_service),
local_resources_(config.resource_config), local_resources_(config.resource_config),
local_available_resources_(config.resource_config), local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers, config.num_workers_per_process, worker_pool_(config.num_initial_workers, config.num_workers_per_process,
@ -174,6 +175,9 @@ ray::Status NodeManager::RegisterGcs() {
// Start sending heartbeats to the GCS. // Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms(); last_heartbeat_at_ms_ = current_time_ms();
Heartbeat(); Heartbeat();
// Start the timer that gets object manager profiling information and sends it
// to the GCS.
GetObjectManagerProfileInfo();
return ray::Status::OK(); return ray::Status::OK();
} }
@ -280,6 +284,34 @@ void NodeManager::Heartbeat() {
}); });
} }
void NodeManager::GetObjectManagerProfileInfo() {
int64_t start_time_ms = current_time_ms();
auto profile_info = object_manager_.GetAndResetProfilingInfo();
if (profile_info.profile_events.size() > 0) {
flatbuffers::FlatBufferBuilder fbb;
auto message = CreateProfileTableData(fbb, &profile_info);
fbb.Finish(message);
auto profile_message = flatbuffers::GetRoot<ProfileTableData>(fbb.GetBufferPointer());
RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(*profile_message));
}
// Reset the timer.
object_manager_profile_timer_.expires_from_now(heartbeat_period_);
object_manager_profile_timer_.async_wait(
[this](const boost::system::error_code &error) {
RAY_CHECK(!error);
GetObjectManagerProfileInfo();
});
int64_t interval = current_time_ms() - start_time_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "GetObjectManagerProfileInfo handler took " << interval << " ms.";
}
}
void NodeManager::ClientAdded(const ClientTableDataT &client_data) { void NodeManager::ClientAdded(const ClientTableDataT &client_data) {
const ClientID client_id = ClientID::from_binary(client_data.client_id); const ClientID client_id = ClientID::from_binary(client_data.client_id);

View file

@ -104,8 +104,15 @@ class NodeManager {
/// \param client_data Data associated with the removed client. /// \param client_data Data associated with the removed client.
/// \return Void. /// \return Void.
void ClientRemoved(const ClientTableDataT &client_data); void ClientRemoved(const ClientTableDataT &client_data);
/// Send heartbeats to the GCS. /// Send heartbeats to the GCS.
void Heartbeat(); void Heartbeat();
/// Get profiling information from the object manager and push it to the GCS.
///
/// \return Void.
void GetObjectManagerProfileInfo();
/// Handler for a heartbeat notification from the GCS. /// Handler for a heartbeat notification from the GCS.
/// ///
/// \param client The GCS client. /// \param client The GCS client.
@ -339,6 +346,9 @@ class NodeManager {
boost::asio::steady_timer heartbeat_timer_; boost::asio::steady_timer heartbeat_timer_;
/// The period used for the heartbeat timer. /// The period used for the heartbeat timer.
std::chrono::milliseconds heartbeat_period_; std::chrono::milliseconds heartbeat_period_;
/// The timer used to get profiling information from the object manager and
/// push it to the GCS.
boost::asio::steady_timer object_manager_profile_timer_;
/// The time that the last heartbeat was sent at. Used to make sure we are /// The time that the last heartbeat was sent at. Used to make sure we are
/// keeping up with heartbeats. /// keeping up with heartbeats.
uint64_t last_heartbeat_at_ms_; uint64_t last_heartbeat_at_ms_;

View file

@ -29,6 +29,18 @@ inline int64_t current_sys_time_ms() {
return ms_since_epoch.count(); return ms_since_epoch.count();
} }
inline int64_t current_sys_time_us() {
std::chrono::microseconds mu_since_epoch =
std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch());
return mu_since_epoch.count();
}
inline double current_sys_time_seconds() {
int64_t microseconds_in_seconds = 1000000;
return static_cast<double>(current_sys_time_us()) / microseconds_in_seconds;
}
inline ray::Status boost_to_ray_status(const boost::system::error_code &error) { inline ray::Status boost_to_ray_status(const boost::system::error_code &error) {
switch (error.value()) { switch (error.value()) {
case boost::system::errc::success: case boost::system::errc::success:

View file

@ -2,6 +2,7 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import json
import os import os
import re import re
import setproctitle import setproctitle
@ -17,6 +18,7 @@ import pytest
import ray import ray
import ray.ray_constants as ray_constants import ray.ray_constants as ray_constants
import ray.test.cluster_utils
import ray.test.test_utils import ray.test.test_utils
@ -1037,6 +1039,58 @@ def test_profiling_api(shutdown_only):
break break
@pytest.fixture()
def ray_start_cluster():
cluster = ray.test.cluster_utils.Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster
num_nodes = 3
for i in range(num_nodes):
cluster.add_node(resources={str(i): 1}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)
@ray.remote
def f(x):
return
# These objects will live on different nodes.
object_ids = [
f._submit(args=[1], resources={str(i): 1}) for i in range(num_nodes)
]
# Broadcast each object from each machine to each other machine.
for object_id in object_ids:
ray.get([
f._submit(args=[object_id], resources={str(i): 1})
for i in range(num_nodes)
])
# The profiling information only flushes once every second.
time.sleep(1.1)
transfer_dump = ray.global_state.chrome_tracing_object_transfer_dump()
# Make sure the transfer dump can be serialized with JSON.
json.loads(json.dumps(transfer_dump))
assert len(transfer_dump) >= num_nodes**2
assert len({
event["pid"]
for event in transfer_dump if event["name"] == "transfer_receive"
}) == num_nodes
assert len({
event["pid"]
for event in transfer_dump if event["name"] == "transfer_send"
}) == num_nodes
def test_identical_function_names(shutdown_only): def test_identical_function_names(shutdown_only):
# Define a bunch of remote functions and make sure that we don't # Define a bunch of remote functions and make sure that we don't
# accidentally call an older version. # accidentally call an older version.