fix includes and first steps to get call working

This commit is contained in:
Philipp Moritz 2016-02-22 16:06:16 -08:00
parent dcb73aba90
commit bade870a38
8 changed files with 44 additions and 26 deletions

View file

@ -10,7 +10,7 @@ try:
except:
import pickle
cdef extern from "types.pb.h":
cdef extern from "../../../build/generated/types.pb.h":
ctypedef enum DataType:
INT32
INT64

View file

@ -24,16 +24,15 @@ cdef extern from "Python.h":
int PyByteArray_Resize(object self, Py_ssize_t size) except -1
char* PyByteArray_AS_STRING(object bytearray)
cdef extern from "types.pb.h":
# cdef extern from "../../../build/generated/orchestra.pb.h":
# cdef cppclass RemoteCallRequest:
# RemoteCallRequest()
# void set_name(const char* value)
# Call* mutable_call()
cdef extern from "../../../build/generated/types.pb.h":
cdef cppclass Values
cdef extern from "orchestra.pb.h":
cdef cppclass RemoteCallRequest:
RemoteCallRequest()
void set_name(const char* value)
Values* mutable_arg()
cdef extern from "types.pb.h":
ctypedef enum DataType:
INT32
INT64
@ -140,13 +139,13 @@ cdef class Worker:
def connect(self, server_addr, worker_addr, objstore_addr):
self.context = orch_create_context(server_addr, worker_addr, objstore_addr)
# cpdef call(self, name, args):
# cdef RemoteCallRequest* result = new RemoteCallRequest()
# result[0].set_name(name)
# unison.serialize_args_into(args, <uintptr_t>result[0].mutable_arg())
# for i in range(10):
# orch_remote_call(self.context, result)
# # return <uintptr_t>result
# cpdef call(self, name, args):
# cdef RemoteCallRequest* result = new RemoteCallRequest()
# result[0].set_name(name)
# unison.serialize_args_into(args, <uintptr_t>result[0].mutable_arg())
# for i in range(10):
# orch_remote_call(self.context, result)
# # return <uintptr_t>result
cpdef do_call(self, ptr):
return orch_remote_call(self.context, <void*>ptr)

View file

@ -63,7 +63,8 @@ message FnTableEntry {
}
message GetDebugInfoReply {
map<string, FnTableEntry> function_table = 1;
repeated Call task = 1;
map<string, FnTableEntry> function_table = 2;
}
service SchedulerServer {

View file

@ -25,5 +25,5 @@ slice orch_get_serialized_obj(Worker* worker, ObjRef objref) {
}
void orch_register_function(Worker* worker, const char* name, size_t num_return_vals) {
// worker->register_function(std::string(name), num_return_vals);
worker->register_function(std::string(name), num_return_vals);
}

View file

@ -2,6 +2,7 @@
#define ORCHESTRA_SCHEDULER_H
#include <deque>
#include <memory>
#include <grpc++/grpc++.h>
@ -51,11 +52,11 @@ public:
// returns number of return values of task
size_t add_task(const Call& task) {
fntable_lock_.lock();
size_t num_return_vals = 2; // fn_table_[task.name()].num_return_vals();
size_t num_return_vals = fntable_[task.name()].num_return_vals();
fntable_lock_.unlock();
// std::unique_ptr<Call> task_ptr(new Call(task)); // TODO: perform copy outside
std::unique_ptr<Call> task_ptr(new Call(task)); // TODO: perform copy outside
tasks_lock_.lock();
// tasks_.push_back(task_ptr);
tasks_.emplace_back(std::move(task_ptr));
tasks_lock_.unlock();
return num_return_vals;
}
@ -131,15 +132,21 @@ public:
info.add_worker(workerid);
fntable_lock_.unlock();
}
/*
void debug_info(DebugInfoReply* debug_info) {
void debug_info(GetDebugInfoReply* debug_info) {
fntable_lock_.lock();
for (const auto& entry : fntable_) {
debug_info->
auto function_table = debug_info->mutable_function_table();
(*function_table)[entry.first].set_num_return_vals(entry.second.num_return_vals());
// TODO: set workerid
}
fntable_lock_.lock();
fntable_lock_.unlock();
tasks_lock_.lock();
for (const auto& entry : tasks_) {
Call* call = debug_info->add_task();
call->CopyFrom(*entry);
}
tasks_lock_.unlock();
}
*/
};
#endif

View file

@ -38,6 +38,7 @@ public:
return Status::OK;
}
Status GetDebugInfo(ServerContext* context, const GetDebugInfoRequest* request, GetDebugInfoReply* reply) override {
scheduler_->debug_info(reply);
return Status::OK;
}
};

View file

@ -121,6 +121,8 @@ class Worker {
void register_function(const std::string& name, size_t num_return_vals) {
ClientContext context;
RegisterFunctionRequest request;
request.set_fnname(name);
request.set_num_return_vals(num_return_vals);
AckReply reply;
scheduler_stub_->RegisterFunction(&context, request, &reply);
}

View file

@ -80,6 +80,14 @@ class SchedulerTest(unittest.TestCase):
w = worker.Worker()
w.connect("127.0.0.1:22221", "127.0.0.1:40002", "127.0.0.1:22222")
w.register_function("hello_world", 2)
reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(), TIMEOUT_SECONDS)
self.assertEqual(reply.function_table.items()[0][0], u'hello_world')
def testCall(self):
scheduler_channel = implementations.insecure_channel('localhost', 22221)
scheduler_stub = orchestra_pb2.beta_create_SchedulerServer_stub(scheduler_channel)
w = worker.Worker()
w.connect("127.0.0.1:22221", "127.0.0.1:40003", "127.0.0.1:22222")
"""