document and clean up orchestra protocol buffer specification

This commit is contained in:
Philipp Moritz 2016-03-25 13:46:12 -07:00
parent 16d91af7b8
commit 4f5a637a8f
4 changed files with 139 additions and 122 deletions

View file

@ -1,82 +1,20 @@
// This file defines the GRPC interface between scheduler, object stores and
// workers. These are used for communication over the network.
// Terminology:
// Worker: A cluster consists of multiple worker processes (typically one
// per core) which execute tasks that can access objects from object stores.
// Object store: Typically there is one object store per node which holds the
// objects locally stored on that node.
// Scheduler: The scheduler process keeps track of a mapping from object
// references to object stores, orchestrates data transfer between object
// stores and assigns tasks to workers.
syntax = "proto3"; syntax = "proto3";
import "types.proto"; import "types.proto";
message AckReply { // Scheduler
string errormsg = 1;
}
message RegisterWorkerRequest {
string worker_address = 1;
string objstore_address = 2;
}
message RegisterWorkerReply {
uint64 workerid = 1;
}
message RegisterObjStoreRequest {
string address = 1;
}
message RegisterObjStoreReply {
uint64 objstoreid = 1;
}
message RegisterFunctionRequest {
uint64 workerid = 1;
string fnname = 2;
uint64 num_return_vals = 3;
}
message RemoteCallRequest {
Call call = 1;
}
message RemoteCallReply {
repeated uint64 result = 1;
}
message PullObjRequest {
uint64 workerid = 1;
uint64 objref = 2;
}
message PushObjRequest {
uint64 workerid = 1;
}
message PushObjReply {
uint64 objref = 1;
}
message ObjReadyRequest {
uint64 objref = 1;
uint64 objstoreid = 2;
}
message WorkerReadyRequest {
uint64 workerid = 1;
}
message ChangeCountRequest {
uint64 objref = 1;
}
message SchedulerDebugInfoRequest {
bool do_scheduling = 1;
}
message FnTableEntry {
repeated uint64 workerid = 1;
uint64 num_return_vals = 2;
}
message SchedulerDebugInfoReply {
repeated Call task = 1;
repeated uint64 avail_worker = 3;
map<string, FnTableEntry> function_table = 2;
}
service Scheduler { service Scheduler {
// Register a new worker with the scheduler // Register a new worker with the scheduler
@ -87,77 +25,159 @@ service Scheduler {
rpc RegisterFunction(RegisterFunctionRequest) returns (AckReply); rpc RegisterFunction(RegisterFunctionRequest) returns (AckReply);
// Asks the scheduler to execute a task, immediately returns an object reference to the result // Asks the scheduler to execute a task, immediately returns an object reference to the result
rpc RemoteCall(RemoteCallRequest) returns (RemoteCallReply); rpc RemoteCall(RemoteCallRequest) returns (RemoteCallReply);
// increment the count of the object reference // Increment the count of the object reference
rpc IncrementCount(ChangeCountRequest) returns (AckReply); rpc IncrementCount(ChangeCountRequest) returns (AckReply);
// decrement the count of the object reference // Decrement the count of the object reference
rpc DecrementCount(ChangeCountRequest) returns (AckReply); rpc DecrementCount(ChangeCountRequest) returns (AckReply);
// request an object reference for an object that will be pushed to an object store // Request an object reference for an object that will be pushed to an object store
rpc PushObj(PushObjRequest) returns (PushObjReply); rpc PushObj(PushObjRequest) returns (PushObjReply);
// request delivery of an object // Request delivery of an object from an object store that holds the object to the local object store
rpc PullObj(PullObjRequest) returns (AckReply); rpc PullObj(PullObjRequest) returns (AckReply);
// used by an object store to tell the scheduler that an object is ready // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared)
rpc ObjReady(ObjReadyRequest) returns (AckReply); rpc ObjReady(ObjReadyRequest) returns (AckReply);
// used by the worker to report back and ask for more work // Used by the worker to report back and ask for more work
rpc WorkerReady(WorkerReadyRequest) returns (AckReply); rpc WorkerReady(WorkerReadyRequest) returns (AckReply);
// get debugging information from the scheduler // Get debugging information from the scheduler
rpc SchedulerDebugInfo(SchedulerDebugInfoRequest) returns (SchedulerDebugInfoReply); rpc SchedulerDebugInfo(SchedulerDebugInfoRequest) returns (SchedulerDebugInfoReply);
} }
message DeliverObjRequest { message AckReply {
string objstore_address = 1; // objstore to deliver the object to
uint64 objref = 2; // reference of object that gets delivered
} }
message RegisterObjRequest { message RegisterWorkerRequest {
uint64 objref = 1; // reference of object that gets registered string worker_address = 1; // IP address of the worker being registered
string objstore_address = 2; // IP address of the object store the worker is connected to
} }
message RegisterObjReply { message RegisterWorkerReply {
uint64 handle = 1; // handle to memory segment where object is stored uint64 workerid = 1; // Worker ID assigned by the scheduler
} }
message ObjChunk { message RegisterObjStoreRequest {
uint64 objref = 1; string objstore_address = 1; // IP address of the object store being registered
uint64 totalsize = 2;
bytes data = 3;
} }
message GetObjRequest { message RegisterObjStoreReply {
uint64 objref = 1; uint64 objstoreid = 1; // Object store ID assigned by the scheduler
} }
message GetObjReply { message RegisterFunctionRequest {
string bucket = 1; uint64 workerid = 1; // Worker that can execute the function
uint64 handle = 2; string fnname = 2; // Name of the function that is registered
uint64 size = 3; uint64 num_return_vals = 3; // Number of return values of the function
} }
message ObjStoreDebugInfoRequest { message RemoteCallRequest {
repeated uint64 objref = 1; // get protocol buffer objects corresponding to objref Call call = 1; // Contains name of the function to be executed and arguments
} }
message ObjStoreDebugInfoReply { message RemoteCallReply {
repeated uint64 objref = 1; // list of object references in the store repeated uint64 result = 1; // Object references of the function return values
repeated Obj obj = 2; // protocol buffer objects that were requested
} }
message PullObjRequest {
uint64 workerid = 1; // Worker that tries to pull the object
uint64 objref = 2; // Object reference of the object being pulled
}
message PushObjRequest {
uint64 workerid = 1; // Worker that tries to push an object
}
message PushObjReply {
uint64 objref = 1; // Object reference assigned by the scheduler to the object
}
message ObjReadyRequest {
uint64 objref = 1; // Object reference of the object that has been finalized
uint64 objstoreid = 2; // ID of the object store the object lives on
}
message WorkerReadyRequest {
uint64 workerid = 1; // ID of the worker which is ready
}
message ChangeCountRequest {
uint64 objref = 1; // Object reference of the object whose reference count is increased or decreased
}
// The following messages are used for debugging purposes:
message SchedulerDebugInfoRequest {
}
message FnTableEntry {
repeated uint64 workerid = 1; // ID of the worker that can execute the function
uint64 num_return_vals = 2; // Number of return values of the function
}
message SchedulerDebugInfoReply {
repeated Call task = 1; // Tasks on the task queue
repeated uint64 avail_worker = 3; // List of workers waiting to get a task assigned
map<string, FnTableEntry> function_table = 2; // Table of all available remote function
}
// Object stores
service ObjStore { service ObjStore {
// Request to deliver the data that comes with an object reference to another object store // Request to deliver an object to another object store (called by the scheduler)
rpc DeliverObj(DeliverObjRequest) returns (AckReply); rpc DeliverObj(DeliverObjRequest) returns (AckReply);
// Accept incoming data from another object store // Accept incoming data from another object store, as a stream of object chunks
rpc StreamObj(stream ObjChunk) returns (AckReply); rpc StreamObj(stream ObjChunk) returns (AckReply);
// Get debug info from the object store // Get debug info from the object store
rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply);
} }
message DeliverObjRequest {
string objstore_address = 1; // Object store to deliver the object to
uint64 objref = 2; // Reference of object that gets delivered
}
message RegisterObjRequest {
uint64 objref = 1; // Reference of object that gets registered
}
message RegisterObjReply {
uint64 handle = 1; // Handle to memory segment where object is stored
}
message ObjChunk {
uint64 objref = 1; // Object reference of the object being streamed
uint64 totalsize = 2; // Total size of the object
bytes data = 3; // Data for this chunk of the object
}
message GetObjRequest {
uint64 objref = 1; // Object reference of the object being requested by the worker
}
message GetObjReply {
string bucket = 1; // Name of the shared memory segment where the object is stored
uint64 handle = 2; // Shared memory pointer to the object
uint64 size = 3; // Total size of the object in bytes
}
// These messages are for debugging purposes:
message ObjStoreDebugInfoRequest {
repeated uint64 objref = 1; // Object references we want to retrieve from the store for inspection
}
message ObjStoreDebugInfoReply {
repeated uint64 objref = 1; // List of object references in the store
repeated Obj obj = 2; // Protocol buffer objects that were requested
}
// Workers
service WorkerService {
rpc InvokeCall(InvokeCallRequest) returns (InvokeCallReply); // Scheduler calls a function from the worker
}
message InvokeCallRequest { message InvokeCallRequest {
Call call = 1; Call call = 1; // Contains name of the function to be executed and arguments
} }
message InvokeCallReply { message InvokeCallReply {
} }
service WorkerService {
rpc InvokeCall(InvokeCallRequest) returns (InvokeCallReply);
}

View file

@ -46,14 +46,14 @@ message Dict {
} }
message Value { message Value {
uint64 ref = 1; // for pass by reference uint64 ref = 1; // For pass by reference
Obj obj = 2; // for pass by value Obj obj = 2; // For pass by value
} }
message Call { message Call {
string name = 1; string name = 1; // Name of the function call
repeated Value arg = 2; repeated Value arg = 2; // List of arguments, can be either object references or protobuf descriptions of object passed by value
repeated uint64 result = 3; // object references for result repeated uint64 result = 3; // Object references for result
} }
message Array { message Array {

View file

@ -27,7 +27,7 @@ ObjStoreService::ObjStoreService(const std::string& objstore_address, std::share
recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true); recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true);
ClientContext context; ClientContext context;
RegisterObjStoreRequest request; RegisterObjStoreRequest request;
request.set_address(objstore_address); request.set_objstore_address(objstore_address);
RegisterObjStoreReply reply; RegisterObjStoreReply reply;
scheduler_stub_->RegisterObjStore(&context, request, &reply); scheduler_stub_->RegisterObjStore(&context, request, &reply);
objstoreid_ = reply.objstoreid(); objstoreid_ = reply.objstoreid();

View file

@ -59,9 +59,9 @@ Status SchedulerService::PullObj(ServerContext* context, const PullObjRequest* r
Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) {
std::lock_guard<std::mutex> objstore_lock(objstores_lock_); std::lock_guard<std::mutex> objstore_lock(objstores_lock_);
ObjStoreId objstoreid = objstores_.size(); ObjStoreId objstoreid = objstores_.size();
auto channel = grpc::CreateChannel(request->address(), grpc::InsecureChannelCredentials()); auto channel = grpc::CreateChannel(request->objstore_address(), grpc::InsecureChannelCredentials());
objstores_.push_back(ObjStoreHandle()); objstores_.push_back(ObjStoreHandle());
objstores_[objstoreid].address = request->address(); objstores_[objstoreid].address = request->objstore_address();
objstores_[objstoreid].channel = channel; objstores_[objstoreid].channel = channel;
objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel); objstores_[objstoreid].objstore_stub = ObjStore::NewStub(channel);
reply->set_objstoreid(objstoreid); reply->set_objstoreid(objstoreid);
@ -263,9 +263,6 @@ void SchedulerService::register_function(const std::string& name, WorkerId worke
} }
void SchedulerService::debug_info(const SchedulerDebugInfoRequest& request, SchedulerDebugInfoReply* reply) { void SchedulerService::debug_info(const SchedulerDebugInfoRequest& request, SchedulerDebugInfoReply* reply) {
if (request.do_scheduling()) {
schedule();
}
fntable_lock_.lock(); fntable_lock_.lock();
auto function_table = reply->mutable_function_table(); auto function_table = reply->mutable_function_table();
for (const auto& entry : fntable_) { for (const auto& entry : fntable_) {