Add 'ray stat' command for debugging (#6622)

* wip

* wip

* wip

* iterate

* move

* fix thread safety
This commit is contained in:
Eric Liang 2019-12-28 14:40:32 -08:00 committed by GitHub
parent 92db13023c
commit 677004ee3d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 111 additions and 21 deletions

View file

@ -91,6 +91,10 @@ The Ray Command Line API
:prog: ray stack
:show-nested:
.. click:: ray.scripts.scripts:stat
:prog: ray stat
:show-nested:
.. click:: ray.scripts.scripts:timeline
:prog: ray timeline
:show-nested:

View file

@ -823,15 +823,15 @@ def clusterbenchmark():
@cli.command()
@click.option(
"--redis-address",
"--address",
required=False,
type=str,
help="Override the redis address to connect to.")
def timeline(redis_address):
if not redis_address:
redis_address = services.find_redis_address_or_die()
logger.info("Connecting to Ray instance at {}.".format(redis_address))
ray.init(redis_address=redis_address)
def timeline(address):
if not address:
address = services.find_redis_address_or_die()
logger.info("Connecting to Ray instance at {}.".format(address))
ray.init(address=address)
time = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
filename = "/tmp/ray-timeline-{}.json".format(time)
ray.timeline(filename=filename)
@ -841,6 +841,34 @@ def timeline(redis_address):
"You can open this with chrome://tracing in the Chrome browser.")
@cli.command()
@click.option(
"--address",
required=False,
type=str,
help="Override the address to connect to.")
def stat(address):
if not address:
address = services.find_redis_address_or_die()
logger.info("Connecting to Ray instance at {}.".format(address))
ray.init(address=address)
import grpc
from ray.core.generated import node_manager_pb2
from ray.core.generated import node_manager_pb2_grpc
for raylet in ray.nodes():
raylet_address = "{}:{}".format(raylet["NodeManagerAddress"],
ray.nodes()[0]["NodeManagerPort"])
logger.info("Querying raylet {}".format(raylet_address))
channel = grpc.insecure_channel(raylet_address)
stub = node_manager_pb2_grpc.NodeManagerServiceStub(channel)
reply = stub.GetNodeStats(
node_manager_pb2.NodeStatsRequest(), timeout=2.0)
print(reply)
cli.add_command(start)
cli.add_command(stop)
cli.add_command(create_or_update, name="up")
@ -856,6 +884,7 @@ cli.add_command(get_head_ip, name="get_head_ip")
cli.add_command(get_worker_ips)
cli.add_command(microbenchmark)
cli.add_command(stack)
cli.add_command(stat)
cli.add_command(timeline)
cli.add_command(project_cli)
cli.add_command(session_cli)

View file

@ -60,9 +60,9 @@ WorkerContext::WorkerContext(WorkerType worker_type, const JobID &job_id)
// For worker main thread which initializes the WorkerContext,
// set task_id according to whether current worker is a driver.
// (For other threads it's set to random ID via GetThreadContext).
GetThreadContext(true).SetCurrentTaskId((worker_type_ == WorkerType::DRIVER)
? TaskID::ForDriverTask(job_id)
: TaskID::Nil());
GetThreadContext().SetCurrentTaskId((worker_type_ == WorkerType::DRIVER)
? TaskID::ForDriverTask(job_id)
: TaskID::Nil());
}
const WorkerType WorkerContext::GetWorkerType() const { return worker_type_; }
@ -148,7 +148,7 @@ int WorkerContext::CurrentActorMaxConcurrency() const {
bool WorkerContext::CurrentActorIsAsync() const { return current_actor_is_asyncio_; }
WorkerThreadContext &WorkerContext::GetThreadContext(bool for_main_thread) {
WorkerThreadContext &WorkerContext::GetThreadContext() {
if (thread_context_ == nullptr) {
thread_context_ = std::unique_ptr<WorkerThreadContext>(new WorkerThreadContext());
}

View file

@ -72,7 +72,7 @@ class WorkerContext {
boost::thread::id main_thread_id_;
private:
static WorkerThreadContext &GetThreadContext(bool for_main_thread = false);
static WorkerThreadContext &GetThreadContext();
/// Per-thread worker context.
static thread_local std::unique_ptr<WorkerThreadContext> thread_context_;

View file

@ -195,6 +195,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: "
<< spec.DebugString();
absl::MutexLock lock(&mutex_);
to_resubmit_.push_back(std::make_pair(current_time_ms() + 5000, spec));
}));
@ -292,8 +293,13 @@ void CoreWorker::RunIOService() {
void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
main_thread_task_id_ = task_id;
bool not_actor_task = false;
{
absl::MutexLock lock(&mutex_);
not_actor_task = actor_id_.IsNil();
}
// Clear all actor handles at the end of each non-actor task.
if (actor_id_.IsNil() && task_id.IsNil()) {
if (not_actor_task && task_id.IsNil()) {
absl::MutexLock lock(&actor_handles_mutex_);
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr));
@ -327,6 +333,7 @@ void CoreWorker::ReportActiveObjectIDs() {
}
void CoreWorker::InternalHeartbeat() {
absl::MutexLock lock(&mutex_);
while (!to_resubmit_.empty() && current_time_ms() > to_resubmit_.front().first) {
RAY_CHECK_OK(direct_task_submitter_->SubmitTask(to_resubmit_.front().second));
to_resubmit_.pop_front();
@ -847,6 +854,11 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId());
{
absl::MutexLock lock(&mutex_);
current_task_ = task_spec;
}
RayFunction func{task_spec.GetLanguage(), task_spec.FunctionDescriptor()};
std::vector<std::shared_ptr<RayObject>> args;
@ -868,7 +880,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
return_ids.pop_back();
task_type = TaskType::ACTOR_CREATION_TASK;
SetActorId(task_spec.ActorCreationId());
RAY_LOG(INFO) << "Creating actor: " << actor_id_;
RAY_LOG(INFO) << "Creating actor: " << task_spec.ActorCreationId();
} else if (task_spec.IsActorTask()) {
RAY_CHECK(return_ids.size() > 0);
return_ids.pop_back();
@ -908,6 +920,10 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
SetCurrentTaskId(TaskID::Nil());
worker_context_.ResetCurrentTask(task_spec);
{
absl::MutexLock lock(&mutex_);
current_task_ = TaskSpecification();
}
return status;
}
@ -1069,7 +1085,14 @@ void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
void CoreWorker::HandleGetCoreWorkerStats(const rpc::GetCoreWorkerStatsRequest &request,
rpc::GetCoreWorkerStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
absl::MutexLock lock(&mutex_);
reply->set_webui_display(webui_display_);
auto stats = reply->mutable_core_worker_stats();
stats->set_num_pending_tasks(task_manager_->NumPendingTasks());
stats->set_num_object_ids_in_scope(reference_counter_->NumObjectIDsInScope());
if (!current_task_.TaskId().IsNil()) {
stats->set_current_task_desc(current_task_.DebugString());
}
send_reply_callback(Status::OK(), nullptr, nullptr);
}
@ -1092,4 +1115,15 @@ void CoreWorker::GetAsync(const ObjectID &object_id, SetResultCallback success_c
});
}
void CoreWorker::SetActorId(const ActorID &actor_id) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(actor_id_.IsNil());
actor_id_ = actor_id;
}
void CoreWorker::SetWebuiDisplay(const std::string &message) {
absl::MutexLock lock(&mutex_);
webui_display_ = message;
}
} // namespace ray

View file

@ -98,12 +98,9 @@ class CoreWorker {
const JobID &GetCurrentJobId() const { return worker_context_.GetCurrentJobID(); }
void SetActorId(const ActorID &actor_id) {
RAY_CHECK(actor_id_.IsNil());
actor_id_ = actor_id;
}
void SetActorId(const ActorID &actor_id);
void SetWebuiDisplay(const std::string &message) { webui_display_ = message; }
void SetWebuiDisplay(const std::string &message);
/// Increase the reference count for this object ID.
/// Increase the local reference count for this object ID. Should be called
@ -638,11 +635,19 @@ class CoreWorker {
/// Fields related to task execution.
///
/// Protects around accesses to fields below. This should only ever be held
/// for short-running periods of time.
mutable absl::Mutex mutex_;
/// Our actor ID. If this is nil, then we execute only stateless tasks.
ActorID actor_id_;
ActorID actor_id_ GUARDED_BY(mutex_);
/// The currently executing task spec. We have to track this separately since
/// we cannot access the thread-local worker contexts from GetCoreWorkerStats()
TaskSpecification current_task_ GUARDED_BY(mutex_);
/// String to be displayed on Web UI.
std::string webui_display_;
std::string webui_display_ GUARDED_BY(mutex_);
/// Event loop where tasks are processed.
boost::asio::io_service task_execution_service_;
@ -671,7 +676,7 @@ class CoreWorker {
std::unique_ptr<CoreWorkerDirectTaskReceiver> direct_task_receiver_;
// Queue of tasks to resubmit when the specified time passes.
std::deque<std::pair<int64_t, TaskSpecification>> to_resubmit_;
std::deque<std::pair<int64_t, TaskSpecification>> to_resubmit_ GUARDED_BY(mutex_);
friend class CoreWorkerTest;
};

View file

@ -84,6 +84,9 @@ class TaskManager : public TaskFinisherInterface {
/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;
/// Return the number of pending tasks.
int NumPendingTasks() const { return pending_tasks_.size(); }
private:
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.

View file

@ -161,3 +161,13 @@ message ResourceMapEntry {
// The set of resource ids assigned.
repeated ResourceId resource_ids = 2;
}
// Debug info returned from the core worker.
message CoreWorkerStats {
// Debug string of the currently executing task.
string current_task_desc = 1;
// Number of pending normal and actor tasks.
int32 num_pending_tasks = 2;
// Number of object ids in local scope.
int32 num_object_ids_in_scope = 3;
}

View file

@ -133,6 +133,8 @@ message GetCoreWorkerStatsRequest {
message GetCoreWorkerStatsReply {
// String displayed on Web UI.
string webui_display = 1;
// Debug information returned from the core worker.
CoreWorkerStats core_worker_stats = 2;
}
service CoreWorkerService {

View file

@ -55,6 +55,8 @@ message WorkerStats {
bool is_driver = 2;
// String displayed on Web UI.
string webui_display = 3;
// Debug information returned from the core worker.
CoreWorkerStats core_worker_stats = 4;
}
message ViewData {

View file

@ -3013,6 +3013,7 @@ void NodeManager::HandleNodeStatsRequest(const rpc::NodeStatsRequest &request,
worker_stats->set_is_driver(false);
reply->set_num_workers(reply->num_workers() + 1);
worker_stats->set_webui_display(r.webui_display());
worker_stats->mutable_core_worker_stats()->MergeFrom(r.core_worker_stats());
if (reply->num_workers() == all_workers.size()) {
send_reply_callback(Status::OK(), nullptr, nullptr);
}