mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
[C++ API] expose C++ task failure event (#18596)
This commit is contained in:
parent
b5c5247ad4
commit
187e4a86ca
7 changed files with 29 additions and 5 deletions
|
@ -20,6 +20,8 @@
|
|||
|
||||
#include "../../util/function_helper.h"
|
||||
#include "../abstract_ray_runtime.h"
|
||||
#include "ray/util/event.h"
|
||||
#include "ray/util/event_label.h"
|
||||
|
||||
namespace ray {
|
||||
|
||||
|
@ -159,6 +161,11 @@ Status TaskExecutor::ExecuteTask(
|
|||
if (!status.ok()) {
|
||||
if (status.IsIntentionalSystemExit()) {
|
||||
return status;
|
||||
} else {
|
||||
RAY_EVENT(ERROR, EL_RAY_CPP_TASK_FAILED)
|
||||
.WithField("task_type", TaskType_Name(task_type))
|
||||
.WithField("function_name", func_name)
|
||||
<< "C++ task failed: " << status.ToString();
|
||||
}
|
||||
|
||||
std::string meta_str = std::to_string(ray::rpc::ErrorType::TASK_EXECUTION_EXCEPTION);
|
||||
|
@ -189,6 +196,10 @@ Status TaskExecutor::ExecuteTask(
|
|||
}
|
||||
|
||||
RAY_CHECK_OK(CoreWorkerProcess::GetCoreWorker().SealReturnObject(result_id, result));
|
||||
} else {
|
||||
if (!status.ok()) {
|
||||
return ray::Status::CreationTaskError();
|
||||
}
|
||||
}
|
||||
return ray::Status::OK();
|
||||
}
|
||||
|
|
|
@ -246,6 +246,10 @@ TEST(RayClusterModeTest, ExceptionTest) {
|
|||
auto actor1 = ray::Actor(RAY_FUNC(Counter::FactoryCreate, int)).Remote(1);
|
||||
auto object1 = actor1.Task(&Counter::ExceptionFunc).Remote();
|
||||
EXPECT_THROW(object1.Get(), ray::internal::RayTaskException);
|
||||
|
||||
auto actor2 = ray::Actor(Counter::FactoryCreateException).Remote();
|
||||
auto object2 = actor2.Task(&Counter::Plus1).Remote();
|
||||
EXPECT_THROW(object2.Get(), ray::internal::RayActorException);
|
||||
}
|
||||
|
||||
TEST(RayClusterModeTest, GetAllNodeInfoTest) {
|
||||
|
|
|
@ -21,13 +21,18 @@
|
|||
#include "unistd.h"
|
||||
#endif
|
||||
|
||||
Counter::Counter(int init) {
|
||||
Counter::Counter(int init, bool with_exception) {
|
||||
if (with_exception) {
|
||||
throw std::invalid_argument("creation error");
|
||||
}
|
||||
count = init;
|
||||
is_restared = ray::WasCurrentActorRestarted();
|
||||
}
|
||||
|
||||
Counter *Counter::FactoryCreate() { return new Counter(0); }
|
||||
|
||||
Counter *Counter::FactoryCreateException() { return new Counter(0, true); }
|
||||
|
||||
Counter *Counter::FactoryCreate(int init) { return new Counter(init); }
|
||||
|
||||
Counter *Counter::FactoryCreate(int init1, int init2) {
|
||||
|
@ -78,7 +83,8 @@ bool Counter::CheckRestartInActorCreationTask() { return is_restared; }
|
|||
|
||||
bool Counter::CheckRestartInActorTask() { return ray::WasCurrentActorRestarted(); }
|
||||
|
||||
RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate), RAY_FUNC(Counter::FactoryCreate, int),
|
||||
RAY_REMOTE(RAY_FUNC(Counter::FactoryCreate), Counter::FactoryCreateException,
|
||||
RAY_FUNC(Counter::FactoryCreate, int),
|
||||
RAY_FUNC(Counter::FactoryCreate, int, int), &Counter::Plus1, &Counter::Add,
|
||||
&Counter::Exit, &Counter::GetPid, &Counter::ExceptionFunc,
|
||||
&Counter::CheckRestartInActorCreationTask, &Counter::CheckRestartInActorTask);
|
||||
|
|
|
@ -21,8 +21,9 @@
|
|||
/// a class of user code
|
||||
class Counter {
|
||||
public:
|
||||
Counter(int init);
|
||||
Counter(int init, bool with_exception = false);
|
||||
static Counter *FactoryCreate();
|
||||
static Counter *FactoryCreateException();
|
||||
static Counter *FactoryCreate(int init);
|
||||
static Counter *FactoryCreate(int init1, int init2);
|
||||
|
||||
|
|
|
@ -315,7 +315,7 @@ void RayEventInit(rpc::Event_SourceType source_type,
|
|||
const std::string &log_dir, const std::string &event_level) {
|
||||
absl::call_once(init_once_, [&source_type, &custom_fields, &log_dir, &event_level]() {
|
||||
RayEventContext::Instance().SetEventContext(source_type, custom_fields);
|
||||
auto event_dir = boost::filesystem::path(log_dir) / boost::filesystem::path("event");
|
||||
auto event_dir = boost::filesystem::path(log_dir) / boost::filesystem::path("events");
|
||||
ray::EventManager::Instance().AddReporter(
|
||||
std::make_shared<ray::LogEventReporter>(source_type, event_dir.string()));
|
||||
SetEventLevel(event_level);
|
||||
|
|
|
@ -28,4 +28,6 @@ namespace ray {
|
|||
|
||||
#define EL_RAY_NODE_REMOVED "RAY_NODE_REMOVED"
|
||||
|
||||
#define EL_RAY_CPP_TASK_FAILED "RAY_CPP_TASK_FAILED"
|
||||
|
||||
} // namespace ray
|
||||
|
|
|
@ -449,7 +449,7 @@ TEST(EVENT_TEST, TEST_RAY_EVENT_INIT) {
|
|||
RAY_EVENT(FATAL, "label") << "test error event";
|
||||
|
||||
std::vector<std::string> vc;
|
||||
ReadContentFromFile(vc, log_dir + "/event/event_RAYLET.log");
|
||||
ReadContentFromFile(vc, log_dir + "/events/event_RAYLET.log");
|
||||
EXPECT_EQ((int)vc.size(), 1);
|
||||
json out_custom_fields;
|
||||
rpc::Event ele_1 = GetEventFromString(vc.back(), &out_custom_fields);
|
||||
|
|
Loading…
Add table
Reference in a new issue