From 31117b5e9669682e8db6bbc23ed097ec7d2fa872 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 12 Oct 2020 13:53:08 +0800 Subject: [PATCH] [GCS]Add job id to log (#11331) --- .../gcs/gcs_client/service_based_accessor.cc | 115 +++++++++++------- src/ray/gcs/gcs_server/gcs_actor_manager.cc | 34 ++++-- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 33 +++-- 3 files changed, 117 insertions(+), 65 deletions(-) diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index 6677bf238..9133e564b 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -130,7 +130,8 @@ Status ServiceBasedActorInfoAccessor::GetAll( Status ServiceBasedActorInfoAccessor::AsyncGet( const ActorID &actor_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting actor info, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Getting actor info, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); rpc::GetActorInfoRequest request; request.set_actor_id(actor_id.Binary()); client_impl_->GetGcsRpcClient().GetActorInfo( @@ -142,7 +143,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGet( callback(status, boost::none); } RAY_LOG(DEBUG) << "Finished getting actor info, status = " << status - << ", actor id = " << actor_id; + << ", actor id = " << actor_id + << ", job id = " << actor_id.JobId(); }); return Status::OK(); } @@ -246,7 +248,8 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe( const ActorID &actor_id, const SubscribeCallback &subscribe, const StatusCallback &done) { - RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id; auto fetch_data_operation = [this, actor_id, @@ -285,13 +288,14 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe( } Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) { - RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex()); absl::MutexLock lock(&mutex_); subscribe_operations_.erase(actor_id); fetch_data_operations_.erase(actor_id); RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = " - << actor_id; + << actor_id << ", job id = " << actor_id.JobId(); return status; } @@ -302,7 +306,8 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint( ActorCheckpointID checkpoint_id = ActorCheckpointID::FromBinary(data_ptr->checkpoint_id()); RAY_LOG(DEBUG) << "Adding actor checkpoint, actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id; + << ", checkpoint id = " << checkpoint_id + << ", job id = " << actor_id.JobId(); rpc::AddActorCheckpointRequest request; request.mutable_checkpoint_data()->CopyFrom(*data_ptr); @@ -316,7 +321,8 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint( } RAY_LOG(DEBUG) << "Finished adding actor checkpoint, status = " << status << ", actor id = " << actor_id - << ", checkpoint id = " << checkpoint_id; + << ", checkpoint id = " << checkpoint_id + << ", job id = " << actor_id.JobId(); done_callback(); }); }; @@ -328,20 +334,22 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint( Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint( const ActorCheckpointID &checkpoint_id, const ActorID &actor_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting actor checkpoint, checkpoint id = " << checkpoint_id; + RAY_LOG(DEBUG) << "Getting actor checkpoint, checkpoint id = " << checkpoint_id + << ", job id = " << actor_id.JobId(); rpc::GetActorCheckpointRequest request; request.set_actor_id(actor_id.Binary()); request.set_checkpoint_id(checkpoint_id.Binary()); client_impl_->GetGcsRpcClient().GetActorCheckpoint( - request, [checkpoint_id, callback](const Status &status, - const rpc::GetActorCheckpointReply &reply) { + request, [checkpoint_id, actor_id, callback]( + const Status &status, const rpc::GetActorCheckpointReply &reply) { if (reply.has_checkpoint_data()) { callback(status, reply.checkpoint_data()); } else { callback(status, boost::none); } RAY_LOG(DEBUG) << "Finished getting actor checkpoint, status = " << status - << ", checkpoint id = " << checkpoint_id; + << ", checkpoint id = " << checkpoint_id + << ", job id = " << actor_id.JobId(); }); return Status::OK(); } @@ -349,7 +357,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint( Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID( const ActorID &actor_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting actor checkpoint id, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Getting actor checkpoint id, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); rpc::GetActorCheckpointIDRequest request; request.set_actor_id(actor_id.Binary()); client_impl_->GetGcsRpcClient().GetActorCheckpointID( @@ -361,7 +370,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID( callback(status, boost::none); } RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, status = " << status - << ", actor id = " << actor_id; + << ", actor id = " << actor_id + << ", job id = " << actor_id.JobId(); }); return Status::OK(); } @@ -854,7 +864,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncAdd( Status ServiceBasedTaskInfoAccessor::AsyncGet( const TaskID &task_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting task, task id = " << task_id; + RAY_LOG(DEBUG) << "Getting task, task id = " << task_id + << ", job id = " << task_id.JobId(); rpc::GetTaskRequest request; request.set_task_id(task_id.Binary()); client_impl_->GetGcsRpcClient().GetTask( @@ -865,7 +876,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet( callback(status, boost::none); } RAY_LOG(DEBUG) << "Finished getting task, status = " << status - << ", task id = " << task_id; + << ", task id = " << task_id << ", job id = " << task_id.JobId(); }); return Status::OK(); } @@ -892,7 +903,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncDelete(const std::vector &task Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( const TaskID &task_id, const SubscribeCallback &subscribe, const StatusCallback &done) { - RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id; + RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id + << ", job id = " << task_id.JobId(); auto fetch_data_operation = [this, task_id, subscribe](const StatusCallback &fetch_done) { @@ -928,11 +940,13 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribe( } Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) { - RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id; + RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id + << ", job id = " << task_id.JobId(); auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex()); subscribe_task_operations_.erase(task_id); fetch_task_data_operations_.erase(task_id); - RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id; + RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id + << ", job id = " << task_id.JobId(); return status; } @@ -941,7 +955,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease( TaskID task_id = TaskID::FromBinary(data_ptr->task_id()); NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id()); RAY_LOG(DEBUG) << "Adding task lease, task id = " << task_id - << ", node id = " << node_id; + << ", node id = " << node_id << ", job id = " << task_id.JobId(); rpc::AddTaskLeaseRequest request; request.mutable_task_lease_data()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().AddTaskLease( @@ -951,14 +965,16 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease( callback(status); } RAY_LOG(DEBUG) << "Finished adding task lease, status = " << status - << ", task id = " << task_id << ", node id = " << node_id; + << ", task id = " << task_id << ", node id = " << node_id + << ", job id = " << task_id.JobId(); }); return Status::OK(); } Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease( const TaskID &task_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id; + RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id + << ", job id = " << task_id.JobId(); rpc::GetTaskLeaseRequest request; request.set_task_id(task_id.Binary()); client_impl_->GetGcsRpcClient().GetTaskLease( @@ -970,7 +986,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease( callback(status, boost::none); } RAY_LOG(DEBUG) << "Finished getting task lease, status = " << status - << ", task id = " << task_id; + << ", task id = " << task_id << ", job id = " << task_id.JobId(); }); return Status::OK(); } @@ -980,7 +996,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease( const SubscribeCallback> &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr) - << "Failed to subscribe task lease, task id = " << task_id; + << "Failed to subscribe task lease, task id = " << task_id + << ", job id = " << task_id.JobId(); auto fetch_data_operation = [this, task_id, subscribe](const StatusCallback &fetch_done) { @@ -1014,33 +1031,39 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease( } Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) { - RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id; + RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id + << ", job id = " << task_id.JobId(); auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex()); subscribe_task_lease_operations_.erase(task_id); fetch_task_lease_data_operations_.erase(task_id); - RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id; + RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id + << ", job id = " << task_id.JobId(); return status; } Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction( const std::shared_ptr &data_ptr, const StatusCallback &callback) { + auto num_reconstructions = data_ptr->num_reconstructions(); NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id()); - RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = " - << data_ptr->num_reconstructions() << ", node id = " << node_id; + TaskID task_id = TaskID::FromBinary(data_ptr->task_id()); + RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = " << num_reconstructions + << ", node id = " << node_id << ", task id = " << task_id + << ", job id = " << task_id.JobId(); rpc::AttemptTaskReconstructionRequest request; request.mutable_task_reconstruction()->CopyFrom(*data_ptr); client_impl_->GetGcsRpcClient().AttemptTaskReconstruction( request, - [data_ptr, node_id, callback](const Status &status, - const rpc::AttemptTaskReconstructionReply &reply) { + [num_reconstructions, node_id, task_id, callback]( + const Status &status, const rpc::AttemptTaskReconstructionReply &reply) { if (callback) { callback(status); } RAY_LOG(DEBUG) << "Finished reconstructing task, status = " << status - << ", reconstructions num = " << data_ptr->num_reconstructions() - << ", node id = " << node_id; + << ", reconstructions num = " << num_reconstructions + << ", node id = " << node_id << ", task id = " << task_id + << ", job id = " << task_id.JobId(); }); return Status::OK(); } @@ -1088,7 +1111,8 @@ ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor( Status ServiceBasedObjectInfoAccessor::AsyncGetLocations( const ObjectID &object_id, const OptionalItemCallback &callback) { - RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id; + RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id + << ", job id = " << object_id.TaskId().JobId(); rpc::GetObjectLocationsRequest request; request.set_object_id(object_id.Binary()); client_impl_->GetGcsRpcClient().GetObjectLocations( @@ -1096,7 +1120,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncGetLocations( const rpc::GetObjectLocationsReply &reply) { callback(status, reply.location_info()); RAY_LOG(DEBUG) << "Finished getting object locations, status = " << status - << ", object id = " << object_id; + << ", object id = " << object_id + << ", job id = " << object_id.TaskId().JobId(); }); return Status::OK(); } @@ -1123,7 +1148,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i const NodeID &node_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id - << ", node id = " << node_id; + << ", node id = " << node_id + << ", job id = " << object_id.TaskId().JobId(); rpc::AddObjectLocationRequest request; request.set_object_id(object_id.Binary()); request.set_node_id(node_id.Binary()); @@ -1138,7 +1164,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i } RAY_LOG(DEBUG) << "Finished adding object location, status = " << status - << ", object id = " << object_id << ", node id = " << node_id; + << ", object id = " << object_id << ", node id = " << node_id + << ", job id = " << object_id.TaskId().JobId(); done_callback(); }); }; @@ -1151,7 +1178,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( const ObjectID &object_id, const std::string &spilled_url, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id - << ", spilled_url = " << spilled_url; + << ", spilled_url = " << spilled_url + << ", job id = " << object_id.TaskId().JobId(); rpc::AddObjectLocationRequest request; request.set_object_id(object_id.Binary()); request.set_spilled_url(spilled_url); @@ -1175,7 +1203,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl( Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) { RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id - << ", node id = " << node_id; + << ", node id = " << node_id + << ", job id = " << object_id.TaskId().JobId(); rpc::RemoveObjectLocationRequest request; request.set_object_id(object_id.Binary()); request.set_node_id(node_id.Binary()); @@ -1189,7 +1218,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation( callback(status); } RAY_LOG(DEBUG) << "Finished removing object location, status = " << status - << ", object id = " << object_id << ", node id = " << node_id; + << ", object id = " << object_id << ", node id = " << node_id + << ", job id = " << object_id.TaskId().JobId(); done_callback(); }); }; @@ -1203,7 +1233,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations( const SubscribeCallback> &subscribe, const StatusCallback &done) { RAY_CHECK(subscribe != nullptr) - << "Failed to subscribe object location, object id = " << object_id; + << "Failed to subscribe object location, object id = " << object_id + << ", job id = " << object_id.TaskId().JobId(); auto fetch_data_operation = [this, object_id, subscribe](const StatusCallback &fetch_done) { @@ -1281,12 +1312,14 @@ void ServiceBasedObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_rest Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations( const ObjectID &object_id) { - RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id; + RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id + << ", job id = " << object_id.TaskId().JobId(); auto status = client_impl_->GetGcsPubSub().Unsubscribe(OBJECT_CHANNEL, object_id.Hex()); absl::MutexLock lock(&mutex_); subscribe_object_operations_.erase(object_id); fetch_object_data_operations_.erase(object_id); - RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id; + RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id + << ", job id = " << object_id.TaskId().JobId(); return status; } diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 09435f278..bb4e83a67 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -462,7 +462,8 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, auto iter = registered_actors_.find(actor_id); if (iter == registered_actors_.end()) { - RAY_LOG(INFO) << "Actor " << actor_id << " may be already destroyed."; + RAY_LOG(DEBUG) << "Actor " << actor_id + << " may be already destroyed, job id = " << actor_id.JobId(); return Status::Invalid("Actor may be already destroyed."); } @@ -490,7 +491,8 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request, // After GCS restarts, the state of the actor may not be `DEPENDENCIES_UNREADY`. if (iter->second->GetState() != rpc::ActorTableData::DEPENDENCIES_UNREADY) { RAY_LOG(INFO) << "Actor " << actor_id - << " is already in the process of creation. Skip it directly."; + << " is already in the process of creation. Skip it directly, job id = " + << actor_id.JobId(); return Status::OK(); } @@ -525,7 +527,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope( auto &workers = owners_[owner_node_id]; auto it = workers.find(owner_id); if (it == workers.end()) { - RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id; + RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id + << ", job id = " << actor_id.JobId(); std::shared_ptr client = worker_client_factory_(actor->GetOwnerAddress()); it = workers.emplace(owner_id, Owner(std::move(client))).first; @@ -539,10 +542,13 @@ void GcsActorManager::PollOwnerForActorOutOfScope( wait_request, [this, owner_node_id, owner_id, actor_id]( Status status, const rpc::WaitForActorOutOfScopeReply &reply) { if (!status.ok()) { - RAY_LOG(INFO) << "Worker " << owner_id << " failed, destroying actor child."; + RAY_LOG(INFO) << "Worker " << owner_id + << " failed, destroying actor child, job id = " + << actor_id.JobId(); } else { RAY_LOG(INFO) << "Actor " << actor_id - << " is out of scope, destroying actor child."; + << " is out of scope, destroying actor child, job id = " + << actor_id.JobId(); } auto node_it = owners_.find(owner_node_id); @@ -555,7 +561,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope( } void GcsActorManager::DestroyActor(const ActorID &actor_id) { - RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id; + RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); actor_to_register_callbacks_.erase(actor_id); actor_to_create_callbacks_.erase(actor_id); auto it = registered_actors_.find(actor_id); @@ -780,7 +787,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche // If the owner and this actor is dead at the same time, the actor // could've been destroyed and dereigstered before reconstruction. if (actor == nullptr) { - RAY_LOG(INFO) << "Actor is destroyed before reconstruction, actor id = " << actor_id; + RAY_LOG(DEBUG) << "Actor is destroyed before reconstruction, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); return; } auto node_id = actor->GetNodeID(); @@ -801,7 +809,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche } RAY_LOG(INFO) << "Actor is failed " << actor_id << " on worker " << worker_id << " at node " << node_id << ", need_reschedule = " << need_reschedule - << ", remaining_restarts = " << remaining_restarts; + << ", remaining_restarts = " << remaining_restarts + << ", job id = " << actor_id.JobId(); if (remaining_restarts != 0) { // num_restarts must be set before updating GCS, or num_restarts will be inconsistent // between memory cache and storage. @@ -858,7 +867,8 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr actor) { void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr &actor) { auto actor_id = actor->GetActorID(); - RAY_LOG(INFO) << "Actor created successfully, actor id = " << actor_id; + RAY_LOG(INFO) << "Actor created successfully, actor id = " << actor_id + << ", job id = " << actor_id.JobId(); // NOTE: If an actor is deleted immediately after the user creates the actor, reference // counter may return a reply to the request of WaitForActorOutOfScope to GCS server, // and GCS server will destroy the actor. The actor creation is asynchronous, it may be @@ -971,7 +981,8 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) { // We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the // dependencies of them may not have been resolved yet. RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = " - << actor->GetActorID() << ", state = " << actor->GetState(); + << actor->GetActorID() << ", state = " << actor->GetState() + << ", job id = " << actor->GetActorID().JobId(); gcs_actor_scheduler_->Reschedule(actor); } } @@ -1073,7 +1084,8 @@ void GcsActorManager::RemoveUnresolvedActor(const std::shared_ptr &act void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr &actor) { const auto &actor_id = actor->GetActorID(); const auto &owner_id = actor->GetOwnerID(); - RAY_LOG(INFO) << "Erasing actor " << actor_id << " owned by " << owner_id; + RAY_LOG(DEBUG) << "Erasing actor " << actor_id << " owned by " << owner_id + << ", job id = " << actor_id.JobId(); const auto &owner_node_id = actor->GetOwnerNodeID(); auto &node = owners_[owner_node_id]; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index bcf8b96da..a1fc20f88 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -67,9 +67,10 @@ void GcsActorScheduler::Schedule(std::shared_ptr actor) { void GcsActorScheduler::Reschedule(std::shared_ptr actor) { if (!actor->GetWorkerID().IsNil()) { - RAY_LOG(INFO) - << "Actor " << actor->GetActorID() - << " is already tied to a leased worker. Create actor directly on worker."; + RAY_LOG(INFO) << "Actor " << actor->GetActorID() + << " is already tied to a leased worker. Create actor directly on " + "worker. Job id = " + << actor->GetActorID().JobId(); auto leased_worker = std::make_shared( actor->GetAddress(), VectorFromProtobuf(actor->GetMutableActorTableData()->resource_mapping()), @@ -193,7 +194,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, auto node_id = NodeID::FromBinary(node->node_id()); RAY_LOG(INFO) << "Start leasing worker from node " << node_id << " for actor " - << actor->GetActorID(); + << actor->GetActorID() << ", job id = " << actor->GetActorID().JobId(); // We need to ensure that the RequestWorkerLease won't be sent before the reply of // ReleaseUnusedWorkers is returned. @@ -222,10 +223,12 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, auto actor_iter = iter->second.find(actor->GetActorID()); if (actor_iter == iter->second.end()) { // if actor is not in leasing state, it means it is cancelled. - RAY_LOG(INFO) << "Raylet granted a lease request, but the outstanding lease " - "request for " - << actor->GetActorID() - << " has been already cancelled. The response will be ignored."; + RAY_LOG(INFO) + << "Raylet granted a lease request, but the outstanding lease " + "request for " + << actor->GetActorID() + << " has been already cancelled. The response will be ignored. Job id = " + << actor->GetActorID().JobId(); return; } @@ -237,7 +240,8 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr actor, node_to_actors_when_leasing_.erase(iter); } RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor " - << actor->GetActorID(); + << actor->GetActorID() + << ", job id = " << actor->GetActorID().JobId(); HandleWorkerLeasedReply(actor, reply); } else { RetryLeasingWorkerFromNode(actor, node); @@ -263,7 +267,7 @@ void GcsActorScheduler::DoRetryLeasingWorkerFromNode( // the node, so try leasing again. RAY_CHECK(iter->second.count(actor->GetActorID()) != 0); RAY_LOG(INFO) << "Retry leasing worker from " << actor->GetNodeID() << " for actor " - << actor->GetActorID(); + << actor->GetActorID() << ", job id = " << actor->GetActorID().JobId(); LeaseWorkerFromNode(actor, node); } } @@ -321,7 +325,8 @@ void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr actor, std::shared_ptr worker) { RAY_CHECK(actor && worker); RAY_LOG(INFO) << "Start creating actor " << actor->GetActorID() << " on worker " - << worker->GetWorkerID() << " at node " << actor->GetNodeID(); + << worker->GetWorkerID() << " at node " << actor->GetNodeID() + << ", job id = " << actor->GetActorID().JobId(); std::unique_ptr request(new rpc::PushTaskRequest()); request->set_intended_worker_id(worker->GetWorkerID().Binary()); @@ -359,7 +364,8 @@ void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr actor, } RAY_LOG(INFO) << "Succeeded in creating actor " << actor->GetActorID() << " on worker " << worker->GetWorkerID() << " at node " - << actor->GetNodeID(); + << actor->GetNodeID() + << ", job id = " << actor->GetActorID().JobId(); schedule_success_handler_(actor); } else { RetryCreatingActorOnWorker(actor, worker); @@ -386,7 +392,8 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker( // The worker is erased from creating map only when `CancelOnNode` // or `CancelOnWorker` or the actor is created successfully. RAY_LOG(INFO) << "Retry creating actor " << actor->GetActorID() << " on worker " - << worker->GetWorkerID() << " at node " << actor->GetNodeID(); + << worker->GetWorkerID() << " at node " << actor->GetNodeID() + << ", job id = " << actor->GetActorID().JobId(); CreateActorOnWorker(actor, worker); } }