[GCS]Add job id to log (#11331)

This commit is contained in:
fangfengbin 2020-10-12 13:53:08 +08:00 committed by GitHub
parent 0d09a17c64
commit 31117b5e96
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 65 deletions

View file

@ -130,7 +130,8 @@ Status ServiceBasedActorInfoAccessor::GetAll(
Status ServiceBasedActorInfoAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<rpc::ActorTableData> &callback) {
RAY_LOG(DEBUG) << "Getting actor info, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Getting actor info, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
rpc::GetActorInfoRequest request;
request.set_actor_id(actor_id.Binary());
client_impl_->GetGcsRpcClient().GetActorInfo(
@ -142,7 +143,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGet(
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor info, status = " << status
<< ", actor id = " << actor_id;
<< ", actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
});
return Status::OK();
}
@ -246,7 +248,8 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
const ActorID &actor_id,
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id;
auto fetch_data_operation = [this, actor_id,
@ -285,13 +288,14 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
}
Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id) {
RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Cancelling subscription to an actor, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex());
absl::MutexLock lock(&mutex_);
subscribe_operations_.erase(actor_id);
fetch_data_operations_.erase(actor_id);
RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = "
<< actor_id;
<< actor_id << ", job id = " << actor_id.JobId();
return status;
}
@ -302,7 +306,8 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint(
ActorCheckpointID checkpoint_id =
ActorCheckpointID::FromBinary(data_ptr->checkpoint_id());
RAY_LOG(DEBUG) << "Adding actor checkpoint, actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
<< ", checkpoint id = " << checkpoint_id
<< ", job id = " << actor_id.JobId();
rpc::AddActorCheckpointRequest request;
request.mutable_checkpoint_data()->CopyFrom(*data_ptr);
@ -316,7 +321,8 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint(
}
RAY_LOG(DEBUG) << "Finished adding actor checkpoint, status = " << status
<< ", actor id = " << actor_id
<< ", checkpoint id = " << checkpoint_id;
<< ", checkpoint id = " << checkpoint_id
<< ", job id = " << actor_id.JobId();
done_callback();
});
};
@ -328,20 +334,22 @@ Status ServiceBasedActorInfoAccessor::AsyncAddCheckpoint(
Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint(
const ActorCheckpointID &checkpoint_id, const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorCheckpointData> &callback) {
RAY_LOG(DEBUG) << "Getting actor checkpoint, checkpoint id = " << checkpoint_id;
RAY_LOG(DEBUG) << "Getting actor checkpoint, checkpoint id = " << checkpoint_id
<< ", job id = " << actor_id.JobId();
rpc::GetActorCheckpointRequest request;
request.set_actor_id(actor_id.Binary());
request.set_checkpoint_id(checkpoint_id.Binary());
client_impl_->GetGcsRpcClient().GetActorCheckpoint(
request, [checkpoint_id, callback](const Status &status,
const rpc::GetActorCheckpointReply &reply) {
request, [checkpoint_id, actor_id, callback](
const Status &status, const rpc::GetActorCheckpointReply &reply) {
if (reply.has_checkpoint_data()) {
callback(status, reply.checkpoint_data());
} else {
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint, status = " << status
<< ", checkpoint id = " << checkpoint_id;
<< ", checkpoint id = " << checkpoint_id
<< ", job id = " << actor_id.JobId();
});
return Status::OK();
}
@ -349,7 +357,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpoint(
Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID(
const ActorID &actor_id,
const OptionalItemCallback<rpc::ActorCheckpointIdData> &callback) {
RAY_LOG(DEBUG) << "Getting actor checkpoint id, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Getting actor checkpoint id, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
rpc::GetActorCheckpointIDRequest request;
request.set_actor_id(actor_id.Binary());
client_impl_->GetGcsRpcClient().GetActorCheckpointID(
@ -361,7 +370,8 @@ Status ServiceBasedActorInfoAccessor::AsyncGetCheckpointID(
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting actor checkpoint id, status = " << status
<< ", actor id = " << actor_id;
<< ", actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
});
return Status::OK();
}
@ -854,7 +864,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncAdd(
Status ServiceBasedTaskInfoAccessor::AsyncGet(
const TaskID &task_id, const OptionalItemCallback<rpc::TaskTableData> &callback) {
RAY_LOG(DEBUG) << "Getting task, task id = " << task_id;
RAY_LOG(DEBUG) << "Getting task, task id = " << task_id
<< ", job id = " << task_id.JobId();
rpc::GetTaskRequest request;
request.set_task_id(task_id.Binary());
client_impl_->GetGcsRpcClient().GetTask(
@ -865,7 +876,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet(
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting task, status = " << status
<< ", task id = " << task_id;
<< ", task id = " << task_id << ", job id = " << task_id.JobId();
});
return Status::OK();
}
@ -892,7 +903,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncDelete(const std::vector<TaskID> &task
Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
const TaskID &task_id, const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto fetch_data_operation = [this, task_id,
subscribe](const StatusCallback &fetch_done) {
@ -928,11 +940,13 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id;
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex());
subscribe_task_operations_.erase(task_id);
fetch_task_data_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id;
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id
<< ", job id = " << task_id.JobId();
return status;
}
@ -941,7 +955,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease(
TaskID task_id = TaskID::FromBinary(data_ptr->task_id());
NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id());
RAY_LOG(DEBUG) << "Adding task lease, task id = " << task_id
<< ", node id = " << node_id;
<< ", node id = " << node_id << ", job id = " << task_id.JobId();
rpc::AddTaskLeaseRequest request;
request.mutable_task_lease_data()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().AddTaskLease(
@ -951,14 +965,16 @@ Status ServiceBasedTaskInfoAccessor::AsyncAddTaskLease(
callback(status);
}
RAY_LOG(DEBUG) << "Finished adding task lease, status = " << status
<< ", task id = " << task_id << ", node id = " << node_id;
<< ", task id = " << task_id << ", node id = " << node_id
<< ", job id = " << task_id.JobId();
});
return Status::OK();
}
Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease(
const TaskID &task_id, const OptionalItemCallback<rpc::TaskLeaseData> &callback) {
RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id;
RAY_LOG(DEBUG) << "Getting task lease, task id = " << task_id
<< ", job id = " << task_id.JobId();
rpc::GetTaskLeaseRequest request;
request.set_task_id(task_id.Binary());
client_impl_->GetGcsRpcClient().GetTaskLease(
@ -970,7 +986,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGetTaskLease(
callback(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting task lease, status = " << status
<< ", task id = " << task_id;
<< ", task id = " << task_id << ", job id = " << task_id.JobId();
});
return Status::OK();
}
@ -980,7 +996,8 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe task lease, task id = " << task_id;
<< "Failed to subscribe task lease, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto fetch_data_operation = [this, task_id,
subscribe](const StatusCallback &fetch_done) {
@ -1014,33 +1031,39 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id;
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id
<< ", job id = " << task_id.JobId();
auto status =
client_impl_->GetGcsPubSub().Unsubscribe(TASK_LEASE_CHANNEL, task_id.Hex());
subscribe_task_lease_operations_.erase(task_id);
fetch_task_lease_data_operations_.erase(task_id);
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id;
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id
<< ", job id = " << task_id.JobId();
return status;
}
Status ServiceBasedTaskInfoAccessor::AttemptTaskReconstruction(
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
const StatusCallback &callback) {
auto num_reconstructions = data_ptr->num_reconstructions();
NodeID node_id = NodeID::FromBinary(data_ptr->node_manager_id());
RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = "
<< data_ptr->num_reconstructions() << ", node id = " << node_id;
TaskID task_id = TaskID::FromBinary(data_ptr->task_id());
RAY_LOG(DEBUG) << "Reconstructing task, reconstructions num = " << num_reconstructions
<< ", node id = " << node_id << ", task id = " << task_id
<< ", job id = " << task_id.JobId();
rpc::AttemptTaskReconstructionRequest request;
request.mutable_task_reconstruction()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().AttemptTaskReconstruction(
request,
[data_ptr, node_id, callback](const Status &status,
const rpc::AttemptTaskReconstructionReply &reply) {
[num_reconstructions, node_id, task_id, callback](
const Status &status, const rpc::AttemptTaskReconstructionReply &reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG) << "Finished reconstructing task, status = " << status
<< ", reconstructions num = " << data_ptr->num_reconstructions()
<< ", node id = " << node_id;
<< ", reconstructions num = " << num_reconstructions
<< ", node id = " << node_id << ", task id = " << task_id
<< ", job id = " << task_id.JobId();
});
return Status::OK();
}
@ -1088,7 +1111,8 @@ ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
Status ServiceBasedObjectInfoAccessor::AsyncGetLocations(
const ObjectID &object_id,
const OptionalItemCallback<rpc::ObjectLocationInfo> &callback) {
RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id;
RAY_LOG(DEBUG) << "Getting object locations, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::GetObjectLocationsRequest request;
request.set_object_id(object_id.Binary());
client_impl_->GetGcsRpcClient().GetObjectLocations(
@ -1096,7 +1120,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncGetLocations(
const rpc::GetObjectLocationsReply &reply) {
callback(status, reply.location_info());
RAY_LOG(DEBUG) << "Finished getting object locations, status = " << status
<< ", object id = " << object_id;
<< ", object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
});
return Status::OK();
}
@ -1123,7 +1148,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i
const NodeID &node_id,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object location, object id = " << object_id
<< ", node id = " << node_id;
<< ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_node_id(node_id.Binary());
@ -1138,7 +1164,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddLocation(const ObjectID &object_i
}
RAY_LOG(DEBUG) << "Finished adding object location, status = " << status
<< ", object id = " << object_id << ", node id = " << node_id;
<< ", object id = " << object_id << ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
done_callback();
});
};
@ -1151,7 +1178,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
const ObjectID &object_id, const std::string &spilled_url,
const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Adding object spilled location, object id = " << object_id
<< ", spilled_url = " << spilled_url;
<< ", spilled_url = " << spilled_url
<< ", job id = " << object_id.TaskId().JobId();
rpc::AddObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_spilled_url(spilled_url);
@ -1175,7 +1203,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncAddSpilledUrl(
Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation(
const ObjectID &object_id, const NodeID &node_id, const StatusCallback &callback) {
RAY_LOG(DEBUG) << "Removing object location, object id = " << object_id
<< ", node id = " << node_id;
<< ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
rpc::RemoveObjectLocationRequest request;
request.set_object_id(object_id.Binary());
request.set_node_id(node_id.Binary());
@ -1189,7 +1218,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncRemoveLocation(
callback(status);
}
RAY_LOG(DEBUG) << "Finished removing object location, status = " << status
<< ", object id = " << object_id << ", node id = " << node_id;
<< ", object id = " << object_id << ", node id = " << node_id
<< ", job id = " << object_id.TaskId().JobId();
done_callback();
});
};
@ -1203,7 +1233,8 @@ Status ServiceBasedObjectInfoAccessor::AsyncSubscribeToLocations(
const SubscribeCallback<ObjectID, std::vector<rpc::ObjectLocationChange>> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr)
<< "Failed to subscribe object location, object id = " << object_id;
<< "Failed to subscribe object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
auto fetch_data_operation = [this, object_id,
subscribe](const StatusCallback &fetch_done) {
@ -1281,12 +1312,14 @@ void ServiceBasedObjectInfoAccessor::AsyncResubscribe(bool is_pubsub_server_rest
Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations(
const ObjectID &object_id) {
RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id;
RAY_LOG(DEBUG) << "Unsubscribing object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
auto status = client_impl_->GetGcsPubSub().Unsubscribe(OBJECT_CHANNEL, object_id.Hex());
absl::MutexLock lock(&mutex_);
subscribe_object_operations_.erase(object_id);
fetch_object_data_operations_.erase(object_id);
RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id;
RAY_LOG(DEBUG) << "Finished unsubscribing object location, object id = " << object_id
<< ", job id = " << object_id.TaskId().JobId();
return status;
}

View file

@ -462,7 +462,8 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
auto iter = registered_actors_.find(actor_id);
if (iter == registered_actors_.end()) {
RAY_LOG(INFO) << "Actor " << actor_id << " may be already destroyed.";
RAY_LOG(DEBUG) << "Actor " << actor_id
<< " may be already destroyed, job id = " << actor_id.JobId();
return Status::Invalid("Actor may be already destroyed.");
}
@ -490,7 +491,8 @@ Status GcsActorManager::CreateActor(const ray::rpc::CreateActorRequest &request,
// After GCS restarts, the state of the actor may not be `DEPENDENCIES_UNREADY`.
if (iter->second->GetState() != rpc::ActorTableData::DEPENDENCIES_UNREADY) {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is already in the process of creation. Skip it directly.";
<< " is already in the process of creation. Skip it directly, job id = "
<< actor_id.JobId();
return Status::OK();
}
@ -525,7 +527,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
auto &workers = owners_[owner_node_id];
auto it = workers.find(owner_id);
if (it == workers.end()) {
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id;
RAY_LOG(DEBUG) << "Adding owner " << owner_id << " of actor " << actor_id
<< ", job id = " << actor_id.JobId();
std::shared_ptr<rpc::CoreWorkerClientInterface> client =
worker_client_factory_(actor->GetOwnerAddress());
it = workers.emplace(owner_id, Owner(std::move(client))).first;
@ -539,10 +542,13 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
wait_request, [this, owner_node_id, owner_id, actor_id](
Status status, const rpc::WaitForActorOutOfScopeReply &reply) {
if (!status.ok()) {
RAY_LOG(INFO) << "Worker " << owner_id << " failed, destroying actor child.";
RAY_LOG(INFO) << "Worker " << owner_id
<< " failed, destroying actor child, job id = "
<< actor_id.JobId();
} else {
RAY_LOG(INFO) << "Actor " << actor_id
<< " is out of scope, destroying actor child.";
<< " is out of scope, destroying actor child, job id = "
<< actor_id.JobId();
}
auto node_it = owners_.find(owner_node_id);
@ -555,7 +561,8 @@ void GcsActorManager::PollOwnerForActorOutOfScope(
}
void GcsActorManager::DestroyActor(const ActorID &actor_id) {
RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id;
RAY_LOG(INFO) << "Destroying actor, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
actor_to_register_callbacks_.erase(actor_id);
actor_to_create_callbacks_.erase(actor_id);
auto it = registered_actors_.find(actor_id);
@ -780,7 +787,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
// If the owner and this actor is dead at the same time, the actor
// could've been destroyed and dereigstered before reconstruction.
if (actor == nullptr) {
RAY_LOG(INFO) << "Actor is destroyed before reconstruction, actor id = " << actor_id;
RAY_LOG(DEBUG) << "Actor is destroyed before reconstruction, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
return;
}
auto node_id = actor->GetNodeID();
@ -801,7 +809,8 @@ void GcsActorManager::ReconstructActor(const ActorID &actor_id, bool need_resche
}
RAY_LOG(INFO) << "Actor is failed " << actor_id << " on worker " << worker_id
<< " at node " << node_id << ", need_reschedule = " << need_reschedule
<< ", remaining_restarts = " << remaining_restarts;
<< ", remaining_restarts = " << remaining_restarts
<< ", job id = " << actor_id.JobId();
if (remaining_restarts != 0) {
// num_restarts must be set before updating GCS, or num_restarts will be inconsistent
// between memory cache and storage.
@ -858,7 +867,8 @@ void GcsActorManager::OnActorCreationFailed(std::shared_ptr<GcsActor> actor) {
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor) {
auto actor_id = actor->GetActorID();
RAY_LOG(INFO) << "Actor created successfully, actor id = " << actor_id;
RAY_LOG(INFO) << "Actor created successfully, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
// NOTE: If an actor is deleted immediately after the user creates the actor, reference
// counter may return a reply to the request of WaitForActorOutOfScope to GCS server,
// and GCS server will destroy the actor. The actor creation is asynchronous, it may be
@ -971,7 +981,8 @@ void GcsActorManager::LoadInitialData(const EmptyCallback &done) {
// We could not reschedule actors in state of `DEPENDENCIES_UNREADY` because the
// dependencies of them may not have been resolved yet.
RAY_LOG(INFO) << "Rescheduling a non-alive actor, actor id = "
<< actor->GetActorID() << ", state = " << actor->GetState();
<< actor->GetActorID() << ", state = " << actor->GetState()
<< ", job id = " << actor->GetActorID().JobId();
gcs_actor_scheduler_->Reschedule(actor);
}
}
@ -1073,7 +1084,8 @@ void GcsActorManager::RemoveUnresolvedActor(const std::shared_ptr<GcsActor> &act
void GcsActorManager::RemoveActorFromOwner(const std::shared_ptr<GcsActor> &actor) {
const auto &actor_id = actor->GetActorID();
const auto &owner_id = actor->GetOwnerID();
RAY_LOG(INFO) << "Erasing actor " << actor_id << " owned by " << owner_id;
RAY_LOG(DEBUG) << "Erasing actor " << actor_id << " owned by " << owner_id
<< ", job id = " << actor_id.JobId();
const auto &owner_node_id = actor->GetOwnerNodeID();
auto &node = owners_[owner_node_id];

View file

@ -67,9 +67,10 @@ void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
void GcsActorScheduler::Reschedule(std::shared_ptr<GcsActor> actor) {
if (!actor->GetWorkerID().IsNil()) {
RAY_LOG(INFO)
<< "Actor " << actor->GetActorID()
<< " is already tied to a leased worker. Create actor directly on worker.";
RAY_LOG(INFO) << "Actor " << actor->GetActorID()
<< " is already tied to a leased worker. Create actor directly on "
"worker. Job id = "
<< actor->GetActorID().JobId();
auto leased_worker = std::make_shared<GcsLeasedWorker>(
actor->GetAddress(),
VectorFromProtobuf(actor->GetMutableActorTableData()->resource_mapping()),
@ -193,7 +194,7 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
auto node_id = NodeID::FromBinary(node->node_id());
RAY_LOG(INFO) << "Start leasing worker from node " << node_id << " for actor "
<< actor->GetActorID();
<< actor->GetActorID() << ", job id = " << actor->GetActorID().JobId();
// We need to ensure that the RequestWorkerLease won't be sent before the reply of
// ReleaseUnusedWorkers is returned.
@ -222,10 +223,12 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
auto actor_iter = iter->second.find(actor->GetActorID());
if (actor_iter == iter->second.end()) {
// if actor is not in leasing state, it means it is cancelled.
RAY_LOG(INFO) << "Raylet granted a lease request, but the outstanding lease "
"request for "
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored.";
RAY_LOG(INFO)
<< "Raylet granted a lease request, but the outstanding lease "
"request for "
<< actor->GetActorID()
<< " has been already cancelled. The response will be ignored. Job id = "
<< actor->GetActorID().JobId();
return;
}
@ -237,7 +240,8 @@ void GcsActorScheduler::LeaseWorkerFromNode(std::shared_ptr<GcsActor> actor,
node_to_actors_when_leasing_.erase(iter);
}
RAY_LOG(INFO) << "Finished leasing worker from " << node_id << " for actor "
<< actor->GetActorID();
<< actor->GetActorID()
<< ", job id = " << actor->GetActorID().JobId();
HandleWorkerLeasedReply(actor, reply);
} else {
RetryLeasingWorkerFromNode(actor, node);
@ -263,7 +267,7 @@ void GcsActorScheduler::DoRetryLeasingWorkerFromNode(
// the node, so try leasing again.
RAY_CHECK(iter->second.count(actor->GetActorID()) != 0);
RAY_LOG(INFO) << "Retry leasing worker from " << actor->GetNodeID() << " for actor "
<< actor->GetActorID();
<< actor->GetActorID() << ", job id = " << actor->GetActorID().JobId();
LeaseWorkerFromNode(actor, node);
}
}
@ -321,7 +325,8 @@ void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr<GcsActor> actor,
std::shared_ptr<GcsLeasedWorker> worker) {
RAY_CHECK(actor && worker);
RAY_LOG(INFO) << "Start creating actor " << actor->GetActorID() << " on worker "
<< worker->GetWorkerID() << " at node " << actor->GetNodeID();
<< worker->GetWorkerID() << " at node " << actor->GetNodeID()
<< ", job id = " << actor->GetActorID().JobId();
std::unique_ptr<rpc::PushTaskRequest> request(new rpc::PushTaskRequest());
request->set_intended_worker_id(worker->GetWorkerID().Binary());
@ -359,7 +364,8 @@ void GcsActorScheduler::CreateActorOnWorker(std::shared_ptr<GcsActor> actor,
}
RAY_LOG(INFO) << "Succeeded in creating actor " << actor->GetActorID()
<< " on worker " << worker->GetWorkerID() << " at node "
<< actor->GetNodeID();
<< actor->GetNodeID()
<< ", job id = " << actor->GetActorID().JobId();
schedule_success_handler_(actor);
} else {
RetryCreatingActorOnWorker(actor, worker);
@ -386,7 +392,8 @@ void GcsActorScheduler::DoRetryCreatingActorOnWorker(
// The worker is erased from creating map only when `CancelOnNode`
// or `CancelOnWorker` or the actor is created successfully.
RAY_LOG(INFO) << "Retry creating actor " << actor->GetActorID() << " on worker "
<< worker->GetWorkerID() << " at node " << actor->GetNodeID();
<< worker->GetWorkerID() << " at node " << actor->GetNodeID()
<< ", job id = " << actor->GetActorID().JobId();
CreateActorOnWorker(actor, worker);
}
}