diff --git a/CMakeLists.txt b/CMakeLists.txt index 51d065ba3..9f2ed5824 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,11 +23,15 @@ include_directories("${NUMPY_INCLUDE_DIR}") set(PROTO_PATH "${CMAKE_SOURCE_DIR}/protos") +set(GRAPH_PROTO "${PROTO_PATH}/graph.proto") set(RAY_PROTO "${PROTO_PATH}/ray.proto") set(TYPES_PROTO "${PROTO_PATH}/types.proto") set(GENERATED_PROTOBUF_PATH "${CMAKE_BINARY_DIR}/generated") file(MAKE_DIRECTORY ${GENERATED_PROTOBUF_PATH}) +set(GRAPH_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.cc") +set(GRAPH_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.h") + set(RAY_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.cc") set(RAY_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.h") set(RAY_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.cc") @@ -35,8 +39,15 @@ set(RAY_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.h") set(TYPES_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.cc") set(TYPES_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.h") -set(TYPES_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.cc") -set(TYPES_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.h") + +add_custom_command( + OUTPUT "${GRAPH_PB_H_FILE}" + "${GRAPH_PB_CPP_FILE}" + COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc + ARGS "--proto_path=${PROTO_PATH}" + "--cpp_out=${GENERATED_PROTOBUF_PATH}" + "${GRAPH_PROTO}" + ) add_custom_command( OUTPUT "${RAY_PB_H_FILE}" @@ -57,23 +68,17 @@ add_custom_command( add_custom_command( OUTPUT "${TYPES_PB_H_FILE}" "${TYPES_PB_CPP_FILE}" - "${TYPES_GRPC_PB_H_FILE}" - "${TYPES_GRPC_PB_CPP_FILE}" COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc ARGS "--proto_path=${PROTO_PATH}" "--cpp_out=${GENERATED_PROTOBUF_PATH}" "${TYPES_PROTO}" - COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc - ARGS "--proto_path=${PROTO_PATH}" - "--grpc_out=${GENERATED_PROTOBUF_PATH}" - "--plugin=protoc-gen-grpc=${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/grpc_cpp_plugin" - "${TYPES_PROTO}" ) -set(GENERATED_PROTOBUF_FILES ${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE} +set(GENERATED_PROTOBUF_FILES + ${GRAPH_PB_H_FILE} ${GRAPH_PB_CPP_FILE} + ${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE} ${RAY_GRPC_PB_H_FILE} ${RAY_GRPC_PB_CPP_FILE} - ${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE} - ${TYPES_GRPC_PB_H_FILE} ${TYPES_GRPC_PB_CPP_FILE}) + ${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE}) include_directories(${GENERATED_PROTOBUF_PATH}) diff --git a/build.sh b/build.sh index 33a1355c8..c73f4f7c5 100755 --- a/build.sh +++ b/build.sh @@ -18,3 +18,7 @@ pushd "$ROOT_DIR/build" cmake .. make install -j$PARALLEL popd + +pushd "$ROOT_DIR/scripts/" + ./gen-protobuf.sh +popd diff --git a/lib/python/ray/__init__.py b/lib/python/ray/__init__.py index e67973a5c..e4de683ea 100644 --- a/lib/python/ray/__init__.py +++ b/lib/python/ray/__init__.py @@ -8,5 +8,6 @@ PYTHON_MODE = 3 import libraylib as lib import serialization -from worker import scheduler_info, task_info, register_module, connect, disconnect, get, put, remote +from worker import scheduler_info, dump_computation_graph, task_info, register_module, connect, disconnect, get, put, remote from libraylib import ObjRef +import internal diff --git a/lib/python/ray/internal/__init__.py b/lib/python/ray/internal/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/lib/python/ray/worker.py b/lib/python/ray/worker.py index 4099d712a..b9649c23f 100644 --- a/lib/python/ray/worker.py +++ b/lib/python/ray/worker.py @@ -143,6 +143,9 @@ def print_task_info(task_data, mode): def scheduler_info(worker=global_worker): return ray.lib.scheduler_info(worker.handle); +def dump_computation_graph(file_name, worker=global_worker): + ray.lib.dump_computation_graph(worker.handle, file_name) + def task_info(worker=global_worker): """Tell the scheduler to return task information. Currently includes a list of all failed tasks since the start of the cluster.""" return ray.lib.task_info(worker.handle); diff --git a/protos/graph.proto b/protos/graph.proto new file mode 100644 index 000000000..983b906ed --- /dev/null +++ b/protos/graph.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +import "types.proto"; + +message Task { + string name = 1; // Name of the function call + repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value + repeated uint64 result = 3; // Object references for result +} + +message Put { + uint64 objref = 1; // The objref for the object that was put +} + +// This is used internally by the scheduler. From the scheduler's perspective, +// the submission of tasks (via SubmitTask) and the submission of puts (via +// PutObj) look very similar, and so it is useful to be able to handle them +// together (for example in the computation graph). +message Operation { + Task task = 1; + Put put = 2; + uint64 creator_operationid = 3; // The id of the task that called this task or put. +} + +message TaskStatus { + uint64 operationid = 1; + string function_name = 2; + string worker_address = 3; + string error_message = 4; +} + +message CompGraph { + repeated Operation operation = 1; +} diff --git a/protos/ray.proto b/protos/ray.proto index 32f0f2097..7b5bf862b 100644 --- a/protos/ray.proto +++ b/protos/ray.proto @@ -12,6 +12,7 @@ syntax = "proto3"; +import "graph.proto"; import "types.proto"; // Scheduler @@ -153,6 +154,7 @@ message SchedulerInfoReply { map function_table = 2; // Table of all available remote function repeated uint64 target_objref = 4; // The target_objrefs_ data structure repeated uint64 reference_count = 5; // The reference_counts_ data structure + CompGraph computation_graph = 6; // The computation graph constructed so far } // Object stores diff --git a/protos/types.proto b/protos/types.proto index 8de582055..2a91abad3 100644 --- a/protos/types.proto +++ b/protos/types.proto @@ -65,33 +65,6 @@ message Value { Obj obj = 2; // For pass by value } -message Task { - string name = 1; // Name of the function call - repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value - repeated uint64 result = 3; // Object references for result -} - -message Put { - uint64 objref = 1; // The objref for the object that was put -} - -// This is used internally by the scheduler. From the scheduler's perspective, -// the submission of tasks (via SubmitTask) and the submission of puts (via -// PutObj) look very similar, and so it is useful to be able to handle them -// together (for example in the computation graph). -message Operation { - Task task = 1; - Put put = 2; - uint64 creator_operationid = 3; // The id of the task that called this task or put. -} - -message TaskStatus { - uint64 operationid = 1; - string function_name = 2; - string worker_address = 3; - string error_message = 4; -} - message Array { repeated uint64 shape = 1; sint64 dtype = 2; diff --git a/scripts/gen-protobuf.sh b/scripts/gen-protobuf.sh new file mode 100755 index 000000000..9a5ceed58 --- /dev/null +++ b/scripts/gen-protobuf.sh @@ -0,0 +1,7 @@ +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +# TODO(mehrdad): How would this look in windows, where does the protoc executable go? +# On Linux, we compile it ourselves, on Windows we might not want to do that (?) +mkdir -p $DIR/../lib/python/ray/internal/ +$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/graph.proto +$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/types.proto diff --git a/src/computation_graph.cc b/src/computation_graph.cc index f2db39a29..2aa696286 100644 --- a/src/computation_graph.cc +++ b/src/computation_graph.cc @@ -19,3 +19,9 @@ const Task& ComputationGraph::get_task(OperationId operationid) { RAY_CHECK(operations_[operationid]->has_task(), "Calling get_task with operationid " << operationid << ", but this corresponds to a put not a task."); return operations_[operationid]->task(); } + +void ComputationGraph::to_protobuf(CompGraph* computation_graph) { + for (OperationId id = 0; id < operations_.size(); ++id) { + computation_graph->add_operation()->CopyFrom(*operations_[id]); + } +} diff --git a/src/computation_graph.h b/src/computation_graph.h index 98ead96f3..2918569b8 100644 --- a/src/computation_graph.h +++ b/src/computation_graph.h @@ -5,7 +5,8 @@ #include #include "ray/ray.h" -#include "ray.grpc.pb.h" + +#include "graph.pb.h" #include "types.pb.h" // used to represent the root operation (that is, the driver code) @@ -21,6 +22,8 @@ public: // Return the task corresponding to a particular OperationId. If operationid // corresponds to a put, then fail. const Task& get_task(OperationId operationid); + // Serialize the computation graph to ProtoBuf and store it in computation_graph + void to_protobuf(CompGraph* computation_graph); private: // maps an OperationId to the corresponding task or put std::vector > operations_; diff --git a/src/raylib.cc b/src/raylib.cc index 9c9f760a6..2c2ff279b 100644 --- a/src/raylib.cc +++ b/src/raylib.cc @@ -824,6 +824,21 @@ PyObject* task_info(PyObject* self, PyObject* args) { return dict; } +PyObject* dump_computation_graph(PyObject* self, PyObject* args) { + Worker* worker; + const char* output_file_name; + if (!PyArg_ParseTuple(args, "O&s", &PyObjectToWorker, &worker, &output_file_name)) { + return NULL; + } + ClientContext context; + SchedulerInfoRequest request; + SchedulerInfoReply reply; + worker->scheduler_info(context, request, reply); + std::fstream output(output_file_name, std::ios::out | std::ios::trunc | std::ios::binary); + RAY_CHECK(reply.computation_graph().SerializeToOstream(&output), "Cannot dump computation graph to file " << output_file_name); + Py_RETURN_NONE; +} + PyObject* set_log_config(PyObject* self, PyObject* args) { const char* log_file_name; if (!PyArg_ParseTuple(args, "s", &log_file_name)) { @@ -859,6 +874,7 @@ static PyMethodDef RayLibMethods[] = { { "start_worker_service", start_worker_service, METH_VARARGS, "start the worker service" }, { "scheduler_info", scheduler_info, METH_VARARGS, "get info about scheduler state" }, { "task_info", task_info, METH_VARARGS, "get task statuses" }, + { "dump_computation_graph", dump_computation_graph, METH_VARARGS, "dump the current computation graph to a file" }, { "set_log_config", set_log_config, METH_VARARGS, "set filename for raylib logging" }, { NULL, NULL, 0, NULL } }; diff --git a/src/scheduler.cc b/src/scheduler.cc index e1d4e02c8..7e0b07d22 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -494,6 +494,7 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn for (const WorkerId& entry : avail_workers_) { reply->add_avail_worker(entry); } + computation_graph_.to_protobuf(reply->mutable_computation_graph()); release_all_locks(); }