// This file defines the GRPC interface between scheduler, object stores and // workers. These are used for communication over the network. // Terminology: // Worker: A cluster consists of multiple worker processes (typically one // per core) which execute tasks that can access objects from object stores. // Object store: Typically there is one object store per node which holds the // objects locally stored on that node. // Scheduler: The scheduler process keeps track of a mapping from object // IDs to object stores, orchestrates data transfer between object // stores and assigns tasks to workers. syntax = "proto3"; import "graph.proto"; import "types.proto"; // Scheduler service Scheduler { // Register a new worker with the scheduler rpc RegisterWorker(RegisterWorkerRequest) returns (RegisterWorkerReply); // Register an object store with the scheduler rpc RegisterObjStore(RegisterObjStoreRequest) returns (RegisterObjStoreReply); // Tell the scheduler that a worker can execute a certain function rpc RegisterFunction(RegisterFunctionRequest) returns (AckReply); // Asks the scheduler to execute a task, immediately returns an object ID to the result rpc SubmitTask(SubmitTaskRequest) returns (SubmitTaskReply); // Increment the count of the object ID rpc IncrementCount(ChangeCountRequest) returns (AckReply); // Decrement the count of the object ID rpc DecrementCount(ChangeCountRequest) returns (AckReply); // Request an object ID for an object that will be put in an object store rpc PutObj(PutObjRequest) returns (PutObjReply); // Request delivery of an object from an object store that holds the object to the local object store rpc RequestObj(RequestObjRequest) returns (AckReply); // Used by the worker to tell the scheduler that two objectids should refer to the same object rpc AliasObjectIDs(AliasObjectIDsRequest) returns (AckReply); // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) rpc ObjReady(ObjReadyRequest) returns (AckReply); // Increments the reference count of a particular object ID rpc IncrementRefCount(IncrementRefCountRequest) returns (AckReply); // Decrements the reference count of a particular object ID rpc DecrementRefCount(DecrementRefCountRequest) returns (AckReply); // Used by the worker to notify the scheduler about which objectids a particular object contains rpc AddContainedObjectIDs(AddContainedObjectIDsRequest) returns (AckReply); // Used by the worker to ask for work, this also returns the status of the previous task if there was one rpc ReadyForNewTask(ReadyForNewTaskRequest) returns (AckReply); // Get information about the scheduler state rpc SchedulerInfo(SchedulerInfoRequest) returns (SchedulerInfoReply); // Get information about tasks rpc TaskInfo(TaskInfoRequest) returns (TaskInfoReply); // Kills the workers rpc KillWorkers(KillWorkersRequest) returns (KillWorkersReply); // Exports function to the workers rpc ExportFunction(ExportFunctionRequest) returns (ExportFunctionReply); // Ship an initializer and reinitializer for a reusable variable to the workers rpc ExportReusableVariable(ExportReusableVariableRequest) returns (AckReply); } message AckReply { } message RegisterWorkerRequest { string node_ip_address = 1; // The IP address of the node the worker is running on. string objstore_address = 2; // The address of the object store the worker should connect to. If omitted, this will be assigned by the scheduler. bool is_driver = 3; // True if the worker is a driver, and false otherwise. } message RegisterWorkerReply { uint64 workerid = 1; // Worker ID assigned by the scheduler uint64 objstoreid = 2; // The Object store ID of the worker's local object store string worker_address = 3; // IP address of the worker being registered string objstore_address = 4; // IP address of the object store the worker should connect to } message RegisterObjStoreRequest { string objstore_address = 1; // IP address of the object store being registered } message RegisterObjStoreReply { uint64 objstoreid = 1; // Object store ID assigned by the scheduler } message RegisterFunctionRequest { uint64 workerid = 1; // Worker that can execute the function string fnname = 2; // Name of the function that is registered uint64 num_return_vals = 3; // Number of return values of the function } message SubmitTaskRequest { uint64 workerid = 1; // The ID of the worker submitting the task Task task = 2; // Contains name of the function to be executed and arguments } message SubmitTaskReply { repeated uint64 result = 1; // Object IDs of the function return values bool function_registered = 2; // True if the function was registered; false otherwise } message RequestObjRequest { uint64 workerid = 1; // Worker that tries to request the object uint64 objectid = 2; // Object ID of the object being requested } message PutObjRequest { uint64 workerid = 1; // Worker that tries to put an object } message PutObjReply { uint64 objectid = 1; // Object ID assigned by the scheduler to the object } message AliasObjectIDsRequest { uint64 alias_objectid = 1; // ObjectID which will be aliased uint64 target_objectid = 2; // The target ObjectID } message ObjReadyRequest { uint64 objectid = 1; // Object ID of the object that has been finalized uint64 objstoreid = 2; // ID of the object store the object lives on } message IncrementRefCountRequest { repeated uint64 objectid = 1; // Object IDs whose reference count should be incremented. Duplicates will be incremented multiple times. } message AddContainedObjectIDsRequest { uint64 objectid = 1; // The objectid of the object in question repeated uint64 contained_objectid = 2; // Object IDs contained in the object } message DecrementRefCountRequest { repeated uint64 objectid = 1; // Object IDs whose reference count should be decremented. Duplicates will be decremented multiple times. } message ReadyForNewTaskRequest { uint64 workerid = 1; // ID of the worker which executed the task message PreviousTaskInfo { bool task_succeeded = 1; // True if the task succeeded, false if it threw an exception string error_message = 2; // The contents of the exception, if the task threw an exception } PreviousTaskInfo previous_task_info = 2; // Information about the previous task, this is only present if there was a previous task } message ChangeCountRequest { uint64 objectid = 1; // Object ID of the object whose reference count is increased or decreased } // The following messages are used to get information about the scheduler state message SchedulerInfoRequest { } message FnTableEntry { repeated uint64 workerid = 1; // ID of the worker that can execute the function uint64 num_return_vals = 2; // Number of return values of the function } message SchedulerInfoReply { repeated uint64 operationid = 1; // OperationIds of the tasks on the task queue repeated uint64 avail_worker = 3; // List of workers waiting to get a task assigned map function_table = 2; // Table of all available remote function repeated uint64 target_objectid = 4; // The target_objectids_ data structure repeated uint64 reference_count = 5; // The reference_counts_ data structure CompGraph computation_graph = 6; // The computation graph constructed so far } // Object stores service ObjStore { // Tell the object store to begin getting an object from another object store (called by the scheduler) rpc StartDelivery(StartDeliveryRequest) returns (AckReply); // Accept incoming data from another object store, as a stream of object chunks rpc StreamObjTo(StreamObjToRequest) returns (stream ObjChunk); // Notify the object store about objectid aliasing. This is called by the scheduler rpc NotifyAlias(NotifyAliasRequest) returns (AckReply); // Tell the object store to deallocate an object held by the object store. This is called by the scheduler. rpc DeallocateObject(DeallocateObjectRequest) returns (AckReply); // Get info about the object store state rpc ObjStoreInfo(ObjStoreInfoRequest) returns (ObjStoreInfoReply); } message StartDeliveryRequest { string objstore_address = 1; // Object store to get the object from uint64 objectid = 2; // ID of object that gets delivered } message RegisterObjRequest { uint64 objectid = 1; // ID of object that gets registered } message RegisterObjReply { uint64 handle = 1; // Handle to memory segment where object is stored } message StreamObjToRequest { uint64 objectid = 1; // Object ID of the object being streamed } message ObjChunk { uint64 total_size = 1; // Total size of the object uint64 metadata_offset = 2; // Offset of the arrow metadata bytes data = 3; // Data for this chunk of the object } message NotifyAliasRequest { uint64 alias_objectid = 1; // The objectid being aliased uint64 canonical_objectid = 2; // The canonical objectid that points to the actual object } message DeallocateObjectRequest { uint64 canonical_objectid = 1; // The canonical objectid of the object to deallocate } message GetObjRequest { uint64 objectid = 1; // Object ID of the object being requested by the worker } message TaskInfoRequest { } message TaskInfoReply { repeated TaskStatus failed_task = 1; repeated TaskStatus running_task = 2; uint64 num_succeeded = 3; // TODO(mehrdadn): We'll want to return information from computation_graph since it's important for visualizing tasks that have been completed etc. } message KillWorkersRequest { } message KillWorkersReply { bool success = 1; // Currently, the only reason to fail is if there are workers still executing tasks } message ExportFunctionRequest { Function function = 1; } message ExportFunctionReply { } message ExportReusableVariableRequest { ReusableVar reusable_variable = 1; // The reusable variable to export. } // These messages are for getting information about the object store state message ObjStoreInfoRequest { repeated uint64 objectid = 1; // Object IDs we want to retrieve from the store for inspection } message ObjStoreInfoReply { repeated uint64 objectid = 1; // List of object IDs in the store repeated Obj obj = 2; // Protocol buffer objects that were requested } // Workers service WorkerService { rpc ExecuteTask(ExecuteTaskRequest) returns (ExecuteTaskReply); // Scheduler calls a function from the worker rpc ImportFunction(ImportFunctionRequest) returns (ImportFunctionReply); // Scheduler imports a function into the worker rpc ImportReusableVariable(ImportReusableVariableRequest) returns (AckReply); // Scheduler imports a reusable variable into the worker rpc Die(DieRequest) returns (DieReply); // Kills this worker } message ExecuteTaskRequest { Task task = 1; // Contains name of the function to be executed and arguments } message ExecuteTaskReply { } message ImportFunctionRequest { Function function = 1; } message ImportFunctionReply { } message ImportReusableVariableRequest { ReusableVar reusable_variable = 1; // The reusable variable to export. } message DieRequest { } message DieReply { } // This message is used by the worker service to send messages to the worker // that are processed by the worker's main loop. message WorkerMessage { oneof worker_item { Task task = 1; // A task for the worker to execute. Function function = 2; // A remote function to import on the worker. ReusableVar reusable_variable = 3; // A reusable variable to import on the worker. } }