mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
Merge pull request #173 from amplab/writegraph
Write computation graph to file
This commit is contained in:
commit
e3ad95b200
13 changed files with 96 additions and 41 deletions
|
@ -23,11 +23,15 @@ include_directories("${NUMPY_INCLUDE_DIR}")
|
|||
|
||||
set(PROTO_PATH "${CMAKE_SOURCE_DIR}/protos")
|
||||
|
||||
set(GRAPH_PROTO "${PROTO_PATH}/graph.proto")
|
||||
set(RAY_PROTO "${PROTO_PATH}/ray.proto")
|
||||
set(TYPES_PROTO "${PROTO_PATH}/types.proto")
|
||||
set(GENERATED_PROTOBUF_PATH "${CMAKE_BINARY_DIR}/generated")
|
||||
file(MAKE_DIRECTORY ${GENERATED_PROTOBUF_PATH})
|
||||
|
||||
set(GRAPH_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.cc")
|
||||
set(GRAPH_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/graph.pb.h")
|
||||
|
||||
set(RAY_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.cc")
|
||||
set(RAY_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.pb.h")
|
||||
set(RAY_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.cc")
|
||||
|
@ -35,8 +39,15 @@ set(RAY_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/ray.grpc.pb.h")
|
|||
|
||||
set(TYPES_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.cc")
|
||||
set(TYPES_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.pb.h")
|
||||
set(TYPES_GRPC_PB_CPP_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.cc")
|
||||
set(TYPES_GRPC_PB_H_FILE "${GENERATED_PROTOBUF_PATH}/types.grpc.pb.h")
|
||||
|
||||
add_custom_command(
|
||||
OUTPUT "${GRAPH_PB_H_FILE}"
|
||||
"${GRAPH_PB_CPP_FILE}"
|
||||
COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc
|
||||
ARGS "--proto_path=${PROTO_PATH}"
|
||||
"--cpp_out=${GENERATED_PROTOBUF_PATH}"
|
||||
"${GRAPH_PROTO}"
|
||||
)
|
||||
|
||||
add_custom_command(
|
||||
OUTPUT "${RAY_PB_H_FILE}"
|
||||
|
@ -57,23 +68,17 @@ add_custom_command(
|
|||
add_custom_command(
|
||||
OUTPUT "${TYPES_PB_H_FILE}"
|
||||
"${TYPES_PB_CPP_FILE}"
|
||||
"${TYPES_GRPC_PB_H_FILE}"
|
||||
"${TYPES_GRPC_PB_CPP_FILE}"
|
||||
COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc
|
||||
ARGS "--proto_path=${PROTO_PATH}"
|
||||
"--cpp_out=${GENERATED_PROTOBUF_PATH}"
|
||||
"${TYPES_PROTO}"
|
||||
COMMAND ${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/protobuf/protoc
|
||||
ARGS "--proto_path=${PROTO_PATH}"
|
||||
"--grpc_out=${GENERATED_PROTOBUF_PATH}"
|
||||
"--plugin=protoc-gen-grpc=${CMAKE_SOURCE_DIR}/thirdparty/grpc/bins/opt/grpc_cpp_plugin"
|
||||
"${TYPES_PROTO}"
|
||||
)
|
||||
|
||||
set(GENERATED_PROTOBUF_FILES ${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE}
|
||||
set(GENERATED_PROTOBUF_FILES
|
||||
${GRAPH_PB_H_FILE} ${GRAPH_PB_CPP_FILE}
|
||||
${RAY_PB_H_FILE} ${RAY_PB_CPP_FILE}
|
||||
${RAY_GRPC_PB_H_FILE} ${RAY_GRPC_PB_CPP_FILE}
|
||||
${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE}
|
||||
${TYPES_GRPC_PB_H_FILE} ${TYPES_GRPC_PB_CPP_FILE})
|
||||
${TYPES_PB_H_FILE} ${TYPES_PB_CPP_FILE})
|
||||
|
||||
include_directories(${GENERATED_PROTOBUF_PATH})
|
||||
|
||||
|
|
4
build.sh
4
build.sh
|
@ -18,3 +18,7 @@ pushd "$ROOT_DIR/build"
|
|||
cmake ..
|
||||
make install -j$PARALLEL
|
||||
popd
|
||||
|
||||
pushd "$ROOT_DIR/scripts/"
|
||||
./gen-protobuf.sh
|
||||
popd
|
||||
|
|
|
@ -8,5 +8,6 @@ PYTHON_MODE = 3
|
|||
|
||||
import libraylib as lib
|
||||
import serialization
|
||||
from worker import scheduler_info, task_info, register_module, connect, disconnect, get, put, remote
|
||||
from worker import scheduler_info, dump_computation_graph, task_info, register_module, connect, disconnect, get, put, remote
|
||||
from libraylib import ObjRef
|
||||
import internal
|
||||
|
|
0
lib/python/ray/internal/__init__.py
Normal file
0
lib/python/ray/internal/__init__.py
Normal file
|
@ -143,6 +143,9 @@ def print_task_info(task_data, mode):
|
|||
def scheduler_info(worker=global_worker):
|
||||
return ray.lib.scheduler_info(worker.handle);
|
||||
|
||||
def dump_computation_graph(file_name, worker=global_worker):
|
||||
ray.lib.dump_computation_graph(worker.handle, file_name)
|
||||
|
||||
def task_info(worker=global_worker):
|
||||
"""Tell the scheduler to return task information. Currently includes a list of all failed tasks since the start of the cluster."""
|
||||
return ray.lib.task_info(worker.handle);
|
||||
|
|
34
protos/graph.proto
Normal file
34
protos/graph.proto
Normal file
|
@ -0,0 +1,34 @@
|
|||
syntax = "proto3";
|
||||
|
||||
import "types.proto";
|
||||
|
||||
message Task {
|
||||
string name = 1; // Name of the function call
|
||||
repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value
|
||||
repeated uint64 result = 3; // Object references for result
|
||||
}
|
||||
|
||||
message Put {
|
||||
uint64 objref = 1; // The objref for the object that was put
|
||||
}
|
||||
|
||||
// This is used internally by the scheduler. From the scheduler's perspective,
|
||||
// the submission of tasks (via SubmitTask) and the submission of puts (via
|
||||
// PutObj) look very similar, and so it is useful to be able to handle them
|
||||
// together (for example in the computation graph).
|
||||
message Operation {
|
||||
Task task = 1;
|
||||
Put put = 2;
|
||||
uint64 creator_operationid = 3; // The id of the task that called this task or put.
|
||||
}
|
||||
|
||||
message TaskStatus {
|
||||
uint64 operationid = 1;
|
||||
string function_name = 2;
|
||||
string worker_address = 3;
|
||||
string error_message = 4;
|
||||
}
|
||||
|
||||
message CompGraph {
|
||||
repeated Operation operation = 1;
|
||||
}
|
|
@ -12,6 +12,7 @@
|
|||
|
||||
syntax = "proto3";
|
||||
|
||||
import "graph.proto";
|
||||
import "types.proto";
|
||||
|
||||
// Scheduler
|
||||
|
@ -153,6 +154,7 @@ message SchedulerInfoReply {
|
|||
map<string, FnTableEntry> function_table = 2; // Table of all available remote function
|
||||
repeated uint64 target_objref = 4; // The target_objrefs_ data structure
|
||||
repeated uint64 reference_count = 5; // The reference_counts_ data structure
|
||||
CompGraph computation_graph = 6; // The computation graph constructed so far
|
||||
}
|
||||
|
||||
// Object stores
|
||||
|
|
|
@ -65,33 +65,6 @@ message Value {
|
|||
Obj obj = 2; // For pass by value
|
||||
}
|
||||
|
||||
message Task {
|
||||
string name = 1; // Name of the function call
|
||||
repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value
|
||||
repeated uint64 result = 3; // Object references for result
|
||||
}
|
||||
|
||||
message Put {
|
||||
uint64 objref = 1; // The objref for the object that was put
|
||||
}
|
||||
|
||||
// This is used internally by the scheduler. From the scheduler's perspective,
|
||||
// the submission of tasks (via SubmitTask) and the submission of puts (via
|
||||
// PutObj) look very similar, and so it is useful to be able to handle them
|
||||
// together (for example in the computation graph).
|
||||
message Operation {
|
||||
Task task = 1;
|
||||
Put put = 2;
|
||||
uint64 creator_operationid = 3; // The id of the task that called this task or put.
|
||||
}
|
||||
|
||||
message TaskStatus {
|
||||
uint64 operationid = 1;
|
||||
string function_name = 2;
|
||||
string worker_address = 3;
|
||||
string error_message = 4;
|
||||
}
|
||||
|
||||
message Array {
|
||||
repeated uint64 shape = 1;
|
||||
sint64 dtype = 2;
|
||||
|
|
7
scripts/gen-protobuf.sh
Executable file
7
scripts/gen-protobuf.sh
Executable file
|
@ -0,0 +1,7 @@
|
|||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
|
||||
# TODO(mehrdad): How would this look in windows, where does the protoc executable go?
|
||||
# On Linux, we compile it ourselves, on Windows we might not want to do that (?)
|
||||
mkdir -p $DIR/../lib/python/ray/internal/
|
||||
$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/graph.proto
|
||||
$DIR/../thirdparty/grpc/bins/opt/protobuf/protoc -I ../protos/ --python_out=$DIR/../lib/python/ray/internal/ ../protos/types.proto
|
|
@ -19,3 +19,9 @@ const Task& ComputationGraph::get_task(OperationId operationid) {
|
|||
RAY_CHECK(operations_[operationid]->has_task(), "Calling get_task with operationid " << operationid << ", but this corresponds to a put not a task.");
|
||||
return operations_[operationid]->task();
|
||||
}
|
||||
|
||||
void ComputationGraph::to_protobuf(CompGraph* computation_graph) {
|
||||
for (OperationId id = 0; id < operations_.size(); ++id) {
|
||||
computation_graph->add_operation()->CopyFrom(*operations_[id]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,8 @@
|
|||
#include <limits>
|
||||
|
||||
#include "ray/ray.h"
|
||||
#include "ray.grpc.pb.h"
|
||||
|
||||
#include "graph.pb.h"
|
||||
#include "types.pb.h"
|
||||
|
||||
// used to represent the root operation (that is, the driver code)
|
||||
|
@ -21,6 +22,8 @@ public:
|
|||
// Return the task corresponding to a particular OperationId. If operationid
|
||||
// corresponds to a put, then fail.
|
||||
const Task& get_task(OperationId operationid);
|
||||
// Serialize the computation graph to ProtoBuf and store it in computation_graph
|
||||
void to_protobuf(CompGraph* computation_graph);
|
||||
private:
|
||||
// maps an OperationId to the corresponding task or put
|
||||
std::vector<std::unique_ptr<Operation> > operations_;
|
||||
|
|
|
@ -824,6 +824,21 @@ PyObject* task_info(PyObject* self, PyObject* args) {
|
|||
return dict;
|
||||
}
|
||||
|
||||
PyObject* dump_computation_graph(PyObject* self, PyObject* args) {
|
||||
Worker* worker;
|
||||
const char* output_file_name;
|
||||
if (!PyArg_ParseTuple(args, "O&s", &PyObjectToWorker, &worker, &output_file_name)) {
|
||||
return NULL;
|
||||
}
|
||||
ClientContext context;
|
||||
SchedulerInfoRequest request;
|
||||
SchedulerInfoReply reply;
|
||||
worker->scheduler_info(context, request, reply);
|
||||
std::fstream output(output_file_name, std::ios::out | std::ios::trunc | std::ios::binary);
|
||||
RAY_CHECK(reply.computation_graph().SerializeToOstream(&output), "Cannot dump computation graph to file " << output_file_name);
|
||||
Py_RETURN_NONE;
|
||||
}
|
||||
|
||||
PyObject* set_log_config(PyObject* self, PyObject* args) {
|
||||
const char* log_file_name;
|
||||
if (!PyArg_ParseTuple(args, "s", &log_file_name)) {
|
||||
|
@ -859,6 +874,7 @@ static PyMethodDef RayLibMethods[] = {
|
|||
{ "start_worker_service", start_worker_service, METH_VARARGS, "start the worker service" },
|
||||
{ "scheduler_info", scheduler_info, METH_VARARGS, "get info about scheduler state" },
|
||||
{ "task_info", task_info, METH_VARARGS, "get task statuses" },
|
||||
{ "dump_computation_graph", dump_computation_graph, METH_VARARGS, "dump the current computation graph to a file" },
|
||||
{ "set_log_config", set_log_config, METH_VARARGS, "set filename for raylib logging" },
|
||||
{ NULL, NULL, 0, NULL }
|
||||
};
|
||||
|
|
|
@ -494,6 +494,7 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn
|
|||
for (const WorkerId& entry : avail_workers_) {
|
||||
reply->add_avail_worker(entry);
|
||||
}
|
||||
computation_graph_.to_protobuf(reply->mutable_computation_graph());
|
||||
release_all_locks();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue