* implement objref aliasing

* updates
This commit is contained in:
Robert Nishihara 2016-04-08 12:58:08 -07:00 committed by Philipp Moritz
parent 34e9c1778b
commit a28920bb24
13 changed files with 441 additions and 96 deletions

View file

@ -22,13 +22,22 @@ class Worker(object):
orchpy.lib.put_object(self.handle, objref, object_capsule) orchpy.lib.put_object(self.handle, objref, object_capsule)
def get_object(self, objref): def get_object(self, objref):
"""Return the value from the local object store for objref `objref`. This will block until the value for `objref` has been written to the local object store.""" """
Return the value from the local object store for objref `objref`. This will
block until the value for `objref` has been written to the local object store.
WARNING: get_object can only be called on a canonical objref.
"""
if orchpy.lib.is_arrow(self.handle, objref): if orchpy.lib.is_arrow(self.handle, objref):
return orchpy.lib.get_arrow(self.handle, objref) return orchpy.lib.get_arrow(self.handle, objref)
else: else:
object_capsule = orchpy.lib.get_object(self.handle, objref) object_capsule = orchpy.lib.get_object(self.handle, objref)
return serialization.deserialize(object_capsule) return serialization.deserialize(object_capsule)
def alias_objrefs(self, alias_objref, target_objref):
"""Make `alias_objref` refer to the same object that `target_objref` refers to."""
orchpy.lib.alias_objrefs(self.handle, alias_objref, target_objref)
def register_function(self, function): def register_function(self, function):
"""Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally.""" """Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally."""
orchpy.lib.register_function(self.handle, function.func_name, len(function.return_types)) orchpy.lib.register_function(self.handle, function.func_name, len(function.return_types))
@ -105,14 +114,15 @@ def distributed(arg_types, return_types, worker=global_worker):
# helper method, this should not be called by the user # helper method, this should not be called by the user
def check_return_values(function, result): def check_return_values(function, result):
if len(function.return_types) == 1: if len(function.return_types) == 1:
if not isinstance(result, function.return_types[0]): result = (result,)
raise Exception("The @distributed decorator for function {} expects one return value with type {}, but {} returned a {}.".format(function.__name__, function.return_types[0], function.__name__, type(result))) # if not isinstance(result, function.return_types[0]):
# raise Exception("The @distributed decorator for function {} expects one return value with type {}, but {} returned a {}.".format(function.__name__, function.return_types[0], function.__name__, type(result)))
else: else:
if len(result) != len(function.return_types): if len(result) != len(function.return_types):
raise Exception("The @distributed decorator for function {} has {} return values with types {}, but {} returned {} values.".format(function.__name__, len(function.return_types), function.return_types, function.__name__, len(result))) raise Exception("The @distributed decorator for function {} has {} return values with types {}, but {} returned {} values.".format(function.__name__, len(function.return_types), function.return_types, function.__name__, len(result)))
for i in range(len(result)): for i in range(len(result)):
if not isinstance(result[i], function.return_types[i]): if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], orchpy.lib.ObjRef)):
raise Exception("The {}th return value for function {} has type {}, but the @distributed decorator expected a return value of type {}.".format(i, function.__name__, type(result[i]), function.return_types[i])) raise Exception("The {}th return value for function {} has type {}, but the @distributed decorator expected a return value of type {} or an ObjRef.".format(i, function.__name__, type(result[i]), function.return_types[i]))
# helper method, this should not be called by the user # helper method, this should not be called by the user
def check_arguments(function, args): def check_arguments(function, args):
@ -132,7 +142,7 @@ def check_arguments(function, args):
else: else:
assert False, "This code should be unreachable." assert False, "This code should be unreachable."
if type(arg) == orchpy.lib.ObjRef: if isinstance(arg, orchpy.lib.ObjRef):
# TODO(rkn): When we have type information in the ObjRef, do type checking here. # TODO(rkn): When we have type information in the ObjRef, do type checking here.
pass pass
else: else:
@ -161,7 +171,7 @@ def get_arguments_for_execution(function, args, worker=global_worker):
else: else:
assert False, "This code should be unreachable." assert False, "This code should be unreachable."
if type(arg) == orchpy.lib.ObjRef: if isinstance(arg, orchpy.lib.ObjRef):
# get the object from the local object store # get the object from the local object store
print "Getting argument {} for function {}.".format(i, function.__name__) print "Getting argument {} for function {}.".format(i, function.__name__)
argument = worker.get_object(arg) argument = worker.get_object(arg)
@ -178,7 +188,13 @@ def get_arguments_for_execution(function, args, worker=global_worker):
# helper method, this should not be called by the user # helper method, this should not be called by the user
def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): def store_outputs_in_objstore(objrefs, outputs, worker=global_worker):
if len(objrefs) == 1: if len(objrefs) == 1:
worker.put_object(objrefs[0], outputs) outputs = (outputs,)
else:
for i in range(len(objrefs)): for i in range(len(objrefs)):
if isinstance(outputs[i], orchpy.lib.ObjRef):
# An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to
print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val)
worker.alias_objrefs(objrefs[i], outputs[i])
pass
else:
worker.put_object(objrefs[i], outputs[i]) worker.put_object(objrefs[i], outputs[i])

View file

@ -33,6 +33,8 @@ service Scheduler {
rpc PushObj(PushObjRequest) returns (PushObjReply); rpc PushObj(PushObjRequest) returns (PushObjReply);
// Request delivery of an object from an object store that holds the object to the local object store // Request delivery of an object from an object store that holds the object to the local object store
rpc RequestObj(RequestObjRequest) returns (AckReply); rpc RequestObj(RequestObjRequest) returns (AckReply);
// Used by the worker to tell the scheduler that two objrefs should refer to the same object
rpc AliasObjRefs(AliasObjRefsRequest) returns (AckReply);
// Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared)
rpc ObjReady(ObjReadyRequest) returns (AckReply); rpc ObjReady(ObjReadyRequest) returns (AckReply);
// Used by the worker to report back and ask for more work // Used by the worker to report back and ask for more work
@ -88,6 +90,11 @@ message PushObjReply {
uint64 objref = 1; // Object reference assigned by the scheduler to the object uint64 objref = 1; // Object reference assigned by the scheduler to the object
} }
message AliasObjRefsRequest {
uint64 alias_objref = 1; // ObjRef which will be aliased
uint64 target_objref = 2; // The target ObjRef
}
message ObjReadyRequest { message ObjReadyRequest {
uint64 objref = 1; // Object reference of the object that has been finalized uint64 objref = 1; // Object reference of the object that has been finalized
uint64 objstoreid = 2; // ID of the object store the object lives on uint64 objstoreid = 2; // ID of the object store the object lives on
@ -124,6 +131,8 @@ service ObjStore {
rpc DeliverObj(DeliverObjRequest) returns (AckReply); rpc DeliverObj(DeliverObjRequest) returns (AckReply);
// Accept incoming data from another object store, as a stream of object chunks // Accept incoming data from another object store, as a stream of object chunks
rpc StreamObj(stream ObjChunk) returns (AckReply); rpc StreamObj(stream ObjChunk) returns (AckReply);
// Notify the object store about objref aliasing. This is called by the scheduler
rpc NotifyAlias(NotifyAliasRequest) returns (AckReply);
// Get debug info from the object store // Get debug info from the object store
rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply);
} }
@ -147,6 +156,11 @@ message ObjChunk {
bytes data = 3; // Data for this chunk of the object bytes data = 3; // Data for this chunk of the object
} }
message NotifyAliasRequest {
uint64 alias_objref = 1; // The objref being aliased
uint64 canonical_objref = 2; // The canonical objref that points to the actual object
}
message GetObjRequest { message GetObjRequest {
uint64 objref = 1; // Object reference of the object being requested by the worker uint64 objref = 1; // Object reference of the object being requested by the worker
} }

View file

@ -84,17 +84,19 @@ private:
// worker requests an allocation from the object store // worker requests an allocation from the object store
// GET: workerid, objref -> objhandle: // GET: workerid, objref -> objhandle:
// worker requests an object from the object store // worker requests an object from the object store
// DONE: workerid, objref -> (): // WORKER_DONE: workerid, objref -> ():
// worker tells the object store that an object has been finalized // worker tells the object store that an object has been finalized
// ALIAS_DONE: objref -> ():
// objstore tells itself that it has finalized something (perhaps an alias)
enum ObjRequestType {ALLOC = 0, GET = 1, DONE = 2}; enum ObjRequestType {ALLOC = 0, GET = 1, WORKER_DONE = 2, ALIAS_DONE};
struct ObjRequest { struct ObjRequest {
WorkerId workerid; // worker that sends the request WorkerId workerid; // worker that sends the request
ObjRequestType type; // do we want to allocate a new object or get a handle? ObjRequestType type; // do we want to allocate a new object or get a handle?
ObjRef objref; // object reference of the object to be returned/allocated ObjRef objref; // object reference of the object to be returned/allocated
int64_t size; // if allocate, that's the size of the object int64_t size; // if allocate, that's the size of the object
int64_t metadata_offset; // if sending 'DONE', that's the location of the metadata relative to the beginning of the object int64_t metadata_offset; // if sending 'WORKER_DONE', that's the location of the metadata relative to the beginning of the object
}; };
typedef size_t SegmentId; // index into a memory segment table typedef size_t SegmentId; // index into a memory segment table

View file

@ -112,10 +112,43 @@ Status ObjStoreService::StreamObj(ServerContext* context, ServerReader<ObjChunk>
} }
*/ */
void ObjStoreService::process_requests() { Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) {
ObjRequest request; // NotifyAlias assumes that the objstore already holds canonical_objref
while (true) { ObjRef alias_objref = request->alias_objref();
recv_queue_.receive(&request); ObjRef canonical_objref = request->canonical_objref();
std::lock_guard<std::mutex> memory_lock(memory_lock_);
if (canonical_objref >= memory_.size()) {
ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.")
}
if (!memory_[canonical_objref].second) {
ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.")
}
if (alias_objref >= memory_.size()) {
memory_.resize(alias_objref + 1);
}
memory_[alias_objref].first = memory_[canonical_objref].first;
memory_[alias_objref].second = true;
ObjRequest done_request;
done_request.type = ObjRequestType::ALIAS_DONE;
done_request.objref = alias_objref;
recv_queue_.send(&done_request);
return Status::OK;
}
void ObjStoreService::process_objstore_request(const ObjRequest request) {
switch (request.type) {
case ObjRequestType::ALIAS_DONE: {
process_pulls_for_objref(request.objref);
}
break;
default: {
ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable.");
}
}
}
void ObjStoreService::process_worker_request(const ObjRequest request) {
if (request.workerid >= send_queues_.size()) { if (request.workerid >= send_queues_.size()) {
send_queues_.resize(request.workerid + 1); send_queues_.resize(request.workerid + 1);
} }
@ -148,21 +181,11 @@ void ObjStoreService::process_requests() {
} }
} }
break; break;
case ObjRequestType::DONE: { case ObjRequestType::WORKER_DONE: {
std::pair<ObjHandle, bool>& item = memory_[request.objref]; std::pair<ObjHandle, bool>& item = memory_[request.objref];
item.first.set_metadata_offset(request.metadata_offset); item.first.set_metadata_offset(request.metadata_offset);
item.second = true; item.second = true;
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_); process_pulls_for_objref(request.objref);
for (size_t i = 0; i < pull_queue_.size(); ++i) {
if (pull_queue_[i].second == request.objref) {
ObjHandle& elem = memory_[request.objref].first;
send_queues_[pull_queue_[i].first].send(&item.first);
// Remove the pull task from the queue
std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]);
pull_queue_.pop_back();
i -= 1;
}
}
// Tell the scheduler that the object arrived // Tell the scheduler that the object arrived
// TODO(pcm): put this in a separate thread so we don't have to pay the latency here // TODO(pcm): put this in a separate thread so we don't have to pay the latency here
ClientContext objready_context; ClientContext objready_context;
@ -173,6 +196,52 @@ void ObjStoreService::process_requests() {
scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply);
} }
break; break;
default: {
ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable.");
}
}
}
void ObjStoreService::process_requests() {
// TODO(rkn): Should memory_lock_ be used in this method?
ObjRequest request;
while (true) {
recv_queue_.receive(&request);
switch (request.type) {
case ObjRequestType::ALLOC: {
process_worker_request(request);
}
break;
case ObjRequestType::GET: {
process_worker_request(request);
}
break;
case ObjRequestType::WORKER_DONE: {
process_worker_request(request);
}
break;
case ObjRequestType::ALIAS_DONE: {
process_objstore_request(request);
}
break;
default: {
ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable.");
}
}
}
}
void ObjStoreService::process_pulls_for_objref(ObjRef objref) {
std::pair<ObjHandle, bool>& item = memory_[objref];
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_);
for (size_t i = 0; i < pull_queue_.size(); ++i) {
if (pull_queue_[i].second == objref) {
ObjHandle& elem = memory_[objref].first;
send_queues_[pull_queue_[i].first].send(&item.first);
// Remove the pull task from the queue
std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]);
pull_queue_.pop_back();
i -= 1;
} }
} }
} }

View file

@ -34,12 +34,16 @@ public:
// Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override; // Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override;
// Status StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) override; // Status StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) override;
Status NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) override;
Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override; Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override;
void start_objstore_service(); void start_objstore_service();
private: private:
// check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect // check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect
ObjStore::Stub& get_objstore_stub(const std::string& objstore_address); ObjStore::Stub& get_objstore_stub(const std::string& objstore_address);
void process_worker_request(const ObjRequest request);
void process_objstore_request(const ObjRequest request);
void process_requests(); void process_requests();
void process_pulls_for_objref(ObjRef objref);
std::string objstore_address_; std::string objstore_address_;
ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table

View file

@ -586,6 +586,7 @@ PyObject* put_object(PyObject* self, PyObject* args) {
} }
PyObject* get_object(PyObject* self, PyObject* args) { PyObject* get_object(PyObject* self, PyObject* args) {
// get_object assumes that objref is a canonical objref
Worker* worker; Worker* worker;
ObjRef objref; ObjRef objref;
if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) {
@ -607,6 +608,17 @@ PyObject* request_object(PyObject* self, PyObject* args) {
Py_RETURN_NONE; Py_RETURN_NONE;
} }
PyObject* alias_objrefs(PyObject* self, PyObject* args) {
Worker* worker;
ObjRef alias_objref;
ObjRef target_objref;
if (!PyArg_ParseTuple(args, "O&O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &alias_objref, &PyObjectToObjRef, &target_objref)) {
return NULL;
}
worker->alias_objrefs(alias_objref, target_objref);
Py_RETURN_NONE;
}
PyObject* start_worker_service(PyObject* self, PyObject* args) { PyObject* start_worker_service(PyObject* self, PyObject* args) {
Worker* worker; Worker* worker;
if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) {
@ -630,6 +642,7 @@ static PyMethodDef OrchPyLibMethods[] = {
{ "get_object", get_object, METH_VARARGS, "get protocol buffer object from the local object store" }, { "get_object", get_object, METH_VARARGS, "get protocol buffer object from the local object store" },
{ "get_objref", get_objref, METH_VARARGS, "register a new object reference with the scheduler" }, { "get_objref", get_objref, METH_VARARGS, "register a new object reference with the scheduler" },
{ "request_object" , request_object, METH_VARARGS, "request an object to be delivered to the local object store" }, { "request_object" , request_object, METH_VARARGS, "request an object to be delivered to the local object store" },
{ "alias_objrefs", alias_objrefs, METH_VARARGS, "make two objrefs refer to the same object" },
{ "wait_for_next_task", wait_for_next_task, METH_VARARGS, "get next task from scheduler (blocking)" }, { "wait_for_next_task", wait_for_next_task, METH_VARARGS, "get next task from scheduler (blocking)" },
{ "remote_call", remote_call, METH_VARARGS, "call a remote function" }, { "remote_call", remote_call, METH_VARARGS, "call a remote function" },
{ "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" }, { "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" },

View file

@ -56,6 +56,33 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ
return Status::OK; return Status::OK;
} }
Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) {
ObjRef alias_objref = request->alias_objref();
ObjRef target_objref = request->target_objref();
objtable_lock_.lock();
size_t size = objtable_.size();
objtable_lock_.unlock();
if (alias_objref >= size) {
ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << alias_objref << " exists");
}
if (target_objref >= size) {
ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << target_objref << " exists");
}
{
std::lock_guard<std::mutex> objstore_lock(target_objrefs_lock_);
if (target_objrefs_[alias_objref] != UNITIALIZED_ALIAS) {
ORCH_LOG(ORCH_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]);
}
target_objrefs_[alias_objref] = target_objref;
}
schedule();
return Status::OK;
}
Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) {
std::lock_guard<std::mutex> objstore_lock(objstores_lock_); std::lock_guard<std::mutex> objstore_lock(objstores_lock_);
ObjStoreId objstoreid = objstores_.size(); ObjStoreId objstoreid = objstores_.size();
@ -84,8 +111,10 @@ Status SchedulerService::RegisterFunction(ServerContext* context, const Register
} }
Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) { Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) {
ORCH_LOG(ORCH_VERBOSE, "object " << request->objref() << " ready on store " << request->objstoreid()); ObjRef objref = request->objref();
add_location(request->objref(), request->objstoreid()); ORCH_LOG(ORCH_VERBOSE, "object " << objref << " ready on store " << request->objstoreid());
add_canonical_objref(objref);
add_location(objref, request->objstoreid());
schedule(); schedule();
return Status::OK; return Status::OK;
} }
@ -106,13 +135,18 @@ Status SchedulerService::SchedulerDebugInfo(ServerContext* context, const Schedu
} }
void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) {
// deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true
if (from == to) { if (from == to) {
ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself.");
} }
if (!has_canonical_objref(objref)) {
ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref.");
}
ClientContext context; ClientContext context;
AckReply reply; AckReply reply;
DeliverObjRequest request; DeliverObjRequest request;
request.set_objref(objref); ObjRef canonical_objref = get_canonical_objref(objref);
request.set_objref(canonical_objref);
std::lock_guard<std::mutex> lock(objstores_lock_); std::lock_guard<std::mutex> lock(objstores_lock_);
request.set_objstore_address(objstores_[to].address); request.set_objstore_address(objstores_[to].address);
objstores_[from].objstore_stub->DeliverObj(&context, request, &reply); objstores_[from].objstore_stub->DeliverObj(&context, request, &reply);
@ -120,20 +154,36 @@ void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId
void SchedulerService::schedule() { void SchedulerService::schedule() {
// TODO: don't recheck if nothing changed // TODO: don't recheck if nothing changed
// This method consists of three kinds of tasks, which are queued in pull_queue_,
// task_queue_, and alias_notification_queue_. They should probably be separated
// into three methods, which are all called from this method
// See what we can do in pull_queue_
{ {
std::lock_guard<std::mutex> objtable_lock(objtable_lock_); std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_); std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_);
// Complete all pull tasks that can be completed. // Complete all pull tasks that can be completed.
for (int i = 0; i < pull_queue_.size(); ++i) { for (int i = 0; i < pull_queue_.size(); ++i) {
const std::pair<WorkerId, ObjRef>& pull = pull_queue_[i]; const std::pair<WorkerId, ObjRef>& pull = pull_queue_[i];
WorkerId workerid = pull.first;
ObjRef objref = pull.second; ObjRef objref = pull.second;
if (objtable_[objref].size() > 0) { WorkerId workerid = pull.first;
if (!std::binary_search(objtable_[objref].begin(), objtable_[objref].end(), get_store(workerid))) { if (!has_canonical_objref(objref)) {
ORCH_LOG(ORCH_DEBUG, "objref " << objref << " does not have a canonical_objref, so continuing");
continue;
}
ObjRef canonical_objref = get_canonical_objref(objref);
ORCH_LOG(ORCH_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref);
if (objtable_[canonical_objref].size() > 0) {
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), get_store(workerid))) {
// The worker's local object store does not already contain objref, so ship // The worker's local object store does not already contain objref, so ship
// it there from an object store that does have it. // it there from an object store that does have it.
ObjStoreId objstoreid = pick_objstore(objref); ObjStoreId objstoreid = pick_objstore(canonical_objref);
deliver_object(objref, objstoreid, get_store(workerid)); deliver_object(canonical_objref, objstoreid, get_store(workerid));
}
{
// Notify the relevant objstore about potential aliasing when it's ready
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref)));
} }
// Remove the pull task from the queue // Remove the pull task from the queue
std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]);
@ -142,6 +192,7 @@ void SchedulerService::schedule() {
} }
} }
} }
// See what we can do in task_queue_
{ {
std::lock_guard<std::mutex> fntable_lock(fntable_lock_); std::lock_guard<std::mutex> fntable_lock(fntable_lock_);
std::lock_guard<std::mutex> avail_workers_lock(avail_workers_lock_); std::lock_guard<std::mutex> avail_workers_lock(avail_workers_lock_);
@ -166,9 +217,26 @@ void SchedulerService::schedule() {
} }
} }
} }
// See what we can do in alias_notification_queue_
{
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
for (int i = 0; i < alias_notification_queue_.size(); ++i) {
const std::pair<WorkerId, std::pair<ObjRef, ObjRef> > alias_notification = alias_notification_queue_[i];
ObjStoreId objstoreid = alias_notification.first;
ObjRef alias_objref = alias_notification.second.first;
ObjRef canonical_objref = alias_notification.second.second;
if (attempt_notify_alias(objstoreid, alias_objref, canonical_objref)) { // this locks both the objstore_ and objtable_
// the attempt to notify the objstore of the objref aliasing succeeded, so remove the notification task from the queue
std::swap(alias_notification_queue_[i], alias_notification_queue_[alias_notification_queue_.size() - 1]);
alias_notification_queue_.pop_back();
i -= 1;
}
}
}
} }
void SchedulerService::submit_task(std::unique_ptr<Call> call, WorkerId workerid) { void SchedulerService::submit_task(std::unique_ptr<Call> call, WorkerId workerid) {
// submit task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments
ClientContext context; ClientContext context;
InvokeCallRequest request; InvokeCallRequest request;
InvokeCallReply reply; InvokeCallReply reply;
@ -176,12 +244,20 @@ void SchedulerService::submit_task(std::unique_ptr<Call> call, WorkerId workerid
for (size_t i = 0; i < call->arg_size(); ++i) { for (size_t i = 0; i < call->arg_size(); ++i) {
if (!call->arg(i).has_obj()) { if (!call->arg(i).has_obj()) {
ObjRef objref = call->arg(i).ref(); ObjRef objref = call->arg(i).ref();
ORCH_LOG(ORCH_INFO, "call contains object ref " << objref); ObjRef canonical_objref = get_canonical_objref(objref);
{
// Notify the relevant objstore about potential aliasing when it's ready
std::lock_guard<std::mutex> alias_notification_queue_lock(alias_notification_queue_lock_);
alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref)));
}
attempt_notify_alias(get_store(workerid), objref, canonical_objref);
ORCH_LOG(ORCH_INFO, "call contains object ref " << canonical_objref);
std::lock_guard<std::mutex> objtable_lock(objtable_lock_); std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
auto &objstores = objtable_[call->arg(i).ref()]; auto &objstores = objtable_[canonical_objref];
std::lock_guard<std::mutex> workers_lock(workers_lock_); std::lock_guard<std::mutex> workers_lock(workers_lock_);
if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { // TODO(rkn): replace this with get_store
deliver_object(objref, pick_objstore(objref), workers_[workerid].objstoreid); deliver_object(canonical_objref, pick_objstore(canonical_objref), workers_[workerid].objstoreid); // TODO(rkn): replace this with get_store
} }
} }
} }
@ -194,7 +270,11 @@ bool SchedulerService::can_run(const Call& task) {
for (int i = 0; i < task.arg_size(); ++i) { for (int i = 0; i < task.arg_size(); ++i) {
if (!task.arg(i).has_obj()) { if (!task.arg(i).has_obj()) {
ObjRef objref = task.arg(i).ref(); ObjRef objref = task.arg(i).ref();
if (objref >= objtable_.size() || objtable_[objref].size() == 0) { if (!has_canonical_objref(objref)) {
return false;
}
ObjRef canonical_objref = get_canonical_objref(objref);
if (canonical_objref >= objtable_.size() || objtable_[canonical_objref].size() == 0) {
return false; return false;
} }
} }
@ -234,24 +314,46 @@ WorkerId SchedulerService::register_worker(const std::string& worker_address, co
} }
ObjRef SchedulerService::register_new_object() { ObjRef SchedulerService::register_new_object() {
std::lock_guard<std::mutex> lock(objtable_lock_); // If we don't simultaneously lock objtable_ and target_objrefs_, we will probably get errors.
ObjRef result = objtable_.size(); std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
std::lock_guard<std::mutex> target_objrefs_lock(target_objrefs_lock_);
ObjRef objtable_size = objtable_.size();
ObjRef target_objrefs_size = target_objrefs_.size();
if (objtable_size != target_objrefs_size) {
ORCH_LOG(ORCH_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size);
}
objtable_.push_back(std::vector<ObjStoreId>()); objtable_.push_back(std::vector<ObjStoreId>());
return result; target_objrefs_.push_back(UNITIALIZED_ALIAS);
return objtable_size;
} }
void SchedulerService::add_location(ObjRef objref, ObjStoreId objstoreid) { void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) {
// add_location must be called with a canonical objref
if (!is_canonical(canonical_objref)) {
ORCH_LOG(ORCH_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")");
}
std::lock_guard<std::mutex> objtable_lock(objtable_lock_); std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
if (objref >= objtable_.size()) { if (canonical_objref >= objtable_.size()) {
ORCH_LOG(ORCH_FATAL, "trying to put object on object store that was not registered with the scheduler"); ORCH_LOG(ORCH_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")");
} }
// do a binary search // do a binary search
auto pos = std::lower_bound(objtable_[objref].begin(), objtable_[objref].end(), objstoreid); auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid);
if (pos == objtable_[objref].end() || objstoreid < *pos) { if (pos == objtable_[canonical_objref].end() || objstoreid < *pos) {
objtable_[objref].insert(pos, objstoreid); objtable_[canonical_objref].insert(pos, objstoreid);
} }
} }
void SchedulerService::add_canonical_objref(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_);
if (objref >= target_objrefs_.size()) {
ORCH_LOG(ORCH_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size());
}
if (target_objrefs_[objref] != UNITIALIZED_ALIAS) {
ORCH_LOG(ORCH_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]);
}
target_objrefs_[objref] = objref;
}
ObjStoreId SchedulerService::get_store(WorkerId workerid) { ObjStoreId SchedulerService::get_store(WorkerId workerid) {
std::lock_guard<std::mutex> lock(workers_lock_); std::lock_guard<std::mutex> lock(workers_lock_);
ObjStoreId result = workers_[workerid].objstoreid; ObjStoreId result = workers_[workerid].objstoreid;
@ -288,12 +390,84 @@ void SchedulerService::debug_info(const SchedulerDebugInfoRequest& request, Sche
avail_workers_lock_.unlock(); avail_workers_lock_.unlock();
} }
ObjStoreId SchedulerService::pick_objstore(ObjRef objref) { ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) {
// pick_objstore must be called with a canonical_objref
std::mt19937 rng; std::mt19937 rng;
std::uniform_int_distribution<int> uni(0, objtable_[objref].size()-1); if (!is_canonical(canonical_objref)) {
ORCH_LOG(ORCH_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")");
}
std::uniform_int_distribution<int> uni(0, objtable_[canonical_objref].size() - 1);
return uni(rng); return uni(rng);
} }
bool SchedulerService::is_canonical(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_);
if (target_objrefs_[objref] == UNITIALIZED_ALIAS) {
ORCH_LOG(ORCH_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << ".");
}
return objref == target_objrefs_[objref];
}
bool SchedulerService::has_canonical_objref(ObjRef objref) {
std::lock_guard<std::mutex> lock(target_objrefs_lock_);
ObjRef objref_temp = objref;
while (true) {
if (objref_temp >= target_objrefs_.size()) {
ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size());
}
if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) {
return false;
}
if (target_objrefs_[objref_temp] == objref_temp) {
return true;
}
objref_temp = target_objrefs_[objref_temp];
}
}
ObjRef SchedulerService::get_canonical_objref(ObjRef objref) {
// get_canonical_objref assumes that has_canonical_objref(objref) is true
std::lock_guard<std::mutex> lock(target_objrefs_lock_);
ObjRef objref_temp = objref;
while (true) {
if (objref_temp >= target_objrefs_.size()) {
ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size());
}
if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) {
ORCH_LOG(ORCH_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << ".");
}
if (target_objrefs_[objref_temp] == objref_temp) {
return objref_temp;
}
objref_temp = target_objrefs_[objref_temp];
ORCH_LOG(ORCH_DEBUG, "Looping in get_canonical_objref.");
}
}
bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_objref, ObjRef canonical_objref) {
// return true if successful and false otherwise
if (alias_objref == canonical_objref) {
// no need to do anything
return true;
}
{
std::lock_guard<std::mutex> lock(objtable_lock_);
if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) {
// the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias
return false;
}
}
ClientContext context;
AckReply reply;
NotifyAliasRequest request;
request.set_alias_objref(alias_objref);
request.set_canonical_objref(canonical_objref);
objstores_lock_.lock();
objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply);
objstores_lock_.unlock();
return true;
}
void start_scheduler_service(const char* server_address) { void start_scheduler_service(const char* server_address) {
SchedulerService service; SchedulerService service;
ServerBuilder builder; ServerBuilder builder;

View file

@ -6,6 +6,7 @@
#include <memory> #include <memory>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
#include <limits>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
@ -23,6 +24,8 @@ using grpc::ClientContext;
using grpc::Channel; using grpc::Channel;
const ObjRef UNITIALIZED_ALIAS = std::numeric_limits<ObjRef>::max();
struct WorkerHandle { struct WorkerHandle {
std::shared_ptr<Channel> channel; std::shared_ptr<Channel> channel;
std::unique_ptr<WorkerService::Stub> worker_stub; std::unique_ptr<WorkerService::Stub> worker_stub;
@ -40,6 +43,7 @@ public:
Status RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) override; Status RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) override;
Status PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) override; Status PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) override;
Status RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) override; Status RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) override;
Status AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) override;
Status RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) override; Status RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) override;
Status RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) override; Status RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) override;
Status RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) override; Status RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) override;
@ -61,6 +65,8 @@ public:
ObjRef register_new_object(); ObjRef register_new_object();
// register the location of the object reference in the object table // register the location of the object reference in the object table
void add_location(ObjRef objref, ObjStoreId objstoreid); void add_location(ObjRef objref, ObjStoreId objstoreid);
// indicate that objref is a canonical objref
void add_canonical_objref(ObjRef objref);
// get object store associated with a workerid // get object store associated with a workerid
ObjStoreId get_store(WorkerId workerid); ObjStoreId get_store(WorkerId workerid);
// register a function with the scheduler // register a function with the scheduler
@ -70,6 +76,14 @@ public:
private: private:
// pick an objectstore that holds a given object (needs protection by objtable_lock_) // pick an objectstore that holds a given object (needs protection by objtable_lock_)
ObjStoreId pick_objstore(ObjRef objref); ObjStoreId pick_objstore(ObjRef objref);
// checks if objref is a canonical objref
bool is_canonical(ObjRef objref);
// checks if aliasing for objref has been completed
bool has_canonical_objref(ObjRef objref);
// get the canonical objref for an objref
ObjRef get_canonical_objref(ObjRef objref);
// attempt to notify the objstore about potential objref aliasing, returns true if successful, if false then retry later
bool attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_objref, ObjRef canonical_objref);
// Vector of all workers registered in the system. Their index in this vector // Vector of all workers registered in the system. Their index in this vector
// is the workerid. // is the workerid.
@ -82,7 +96,16 @@ private:
// vector is the objstoreid. // vector is the objstoreid.
std::vector<ObjStoreHandle> objstores_; std::vector<ObjStoreHandle> objstores_;
grpc::mutex objstores_lock_; grpc::mutex objstores_lock_;
// Mapping from objref to list of object stores where the object is stored.
// Mapping from an aliased objref to the objref it is aliased with. If an
// objref is a canonical objref (meaning it is not aliased), then
// target_objrefs_[objref] == objref. For each objref, target_objrefs_[objref]
// is initialized to UNITIALIZED_ALIAS and the correct value is filled later
// when it is known.
std::vector<ObjRef> target_objrefs_;
std::mutex target_objrefs_lock_;
// Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_.
ObjTable objtable_; ObjTable objtable_;
std::mutex objtable_lock_; std::mutex objtable_lock_;
// Hash map from function names to workers where the function is registered. // Hash map from function names to workers where the function is registered.
@ -94,6 +117,9 @@ private:
// List of pending pull calls. // List of pending pull calls.
std::vector<std::pair<WorkerId, ObjRef> > pull_queue_; std::vector<std::pair<WorkerId, ObjRef> > pull_queue_;
std::mutex pull_queue_lock_; std::mutex pull_queue_lock_;
// List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)).
std::vector<std::pair<ObjStoreId, std::pair<ObjRef, ObjRef> > > alias_notification_queue_;
std::mutex alias_notification_queue_lock_;
}; };
#endif #endif

View file

@ -56,6 +56,7 @@ ObjRef Worker::get_objref() {
} }
slice Worker::get_object(ObjRef objref) { slice Worker::get_object(ObjRef objref) {
// get_object assumes that objref is a canonical objref
ObjRequest request; ObjRequest request;
request.workerid = workerid_; request.workerid = workerid_;
request.type = ObjRequestType::GET; request.type = ObjRequestType::GET;
@ -83,7 +84,7 @@ void Worker::put_object(ObjRef objref, const Obj* obj) {
receive_obj_queue_.receive(&result); receive_obj_queue_.receive(&result);
uint8_t* target = segmentpool_.get_address(result); uint8_t* target = segmentpool_.get_address(result);
std::memcpy(target, &data[0], data.size()); std::memcpy(target, &data[0], data.size());
request.type = ObjRequestType::DONE; request.type = ObjRequestType::WORKER_DONE;
request.metadata_offset = 0; request.metadata_offset = 0;
request_obj_queue_.send(&request); request_obj_queue_.send(&request);
} }
@ -99,7 +100,7 @@ void Worker::put_arrow(ObjRef objref, PyArrayObject* array) {
ObjHandle result; ObjHandle result;
receive_obj_queue_.receive(&result); receive_obj_queue_.receive(&result);
store_arrow(array, result, &segmentpool_); store_arrow(array, result, &segmentpool_);
request.type = ObjRequestType::DONE; request.type = ObjRequestType::WORKER_DONE;
request.metadata_offset = result.metadata_offset(); request.metadata_offset = result.metadata_offset();
request_obj_queue_.send(&request); request_obj_queue_.send(&request);
} }
@ -126,6 +127,15 @@ bool Worker::is_arrow(ObjRef objref) {
return result.metadata_offset() != 0; return result.metadata_offset() != 0;
} }
void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) {
ClientContext context;
AliasObjRefsRequest request;
request.set_alias_objref(alias_objref);
request.set_target_objref(target_objref);
AckReply reply;
scheduler_stub_->AliasObjRefs(&context, request, &reply);
}
void Worker::register_function(const std::string& name, size_t num_return_vals) { void Worker::register_function(const std::string& name, size_t num_return_vals) {
ClientContext context; ClientContext context;
RegisterFunctionRequest request; RegisterFunctionRequest request;

View file

@ -58,6 +58,8 @@ class Worker {
PyArrayObject* get_arrow(ObjRef objref); PyArrayObject* get_arrow(ObjRef objref);
// determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this?
bool is_arrow(ObjRef objref); bool is_arrow(ObjRef objref);
// make `alias_objref` refer to the same object that `target_objref` refers to
void alias_objrefs(ObjRef alias_objref, ObjRef target_objref);
// register function with scheduler // register function with scheduler
void register_function(const std::string& name, size_t num_return_vals); void register_function(const std::string& name, size_t num_return_vals);
// start the worker server which accepts tasks from the scheduler and stores // start the worker server which accepts tasks from the scheduler and stores

View file

@ -140,21 +140,20 @@ class WorkerTest(unittest.TestCase):
services.cleanup() services.cleanup()
"""
class APITest(unittest.TestCase): class APITest(unittest.TestCase):
def testObjRefAliasing(self): def testObjRefAliasing(self):
services.start_scheduler(address(IP_ADDRESS, new_scheduler_port())) w = worker.Worker()
time.sleep(0.1)
services.start_objstore(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, new_objstore_port()))
time.sleep(0.2)
worker1 = worker.Worker()
orchpy.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, objstore_port), address(IP_ADDRESS, new_worker_port()), worker1)
test_dir = os.path.dirname(os.path.abspath(__file__)) test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py") test_path = os.path.join(test_dir, "testrecv.py")
services.start_worker(test_path, address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, objstore_port), address(IP_ADDRESS, new_worker_port())) services.start_cluster(num_workers=3, worker_path=test_path, driver_worker=w)
"""
objref = w.remote_call("__main__.test_alias_f", [])
self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5])))
objref = w.remote_call("__main__.test_alias_g", [])
self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5])))
objref = w.remote_call("__main__.test_alias_h", [])
self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5])))
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View file

@ -1,4 +1,5 @@
import argparse import argparse
import numpy as np
import orchpy import orchpy
import orchpy.services as services import orchpy.services as services
@ -18,6 +19,18 @@ parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str,
parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address")
parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address") parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address")
@orchpy.distributed([], [np.ndarray])
def test_alias_f():
return np.ones([3, 4, 5])
@orchpy.distributed([], [np.ndarray])
def test_alias_g():
return test_alias_f()
@orchpy.distributed([], [np.ndarray])
def test_alias_h():
return test_alias_g()
@orchpy.distributed([str], [str]) @orchpy.distributed([str], [str])
def print_string(string): def print_string(string):
print "called print_string with", string print "called print_string with", string

View file

@ -20,8 +20,11 @@ def test_alias_f():
@orchpy.distributed([], [np.ndarray]) @orchpy.distributed([], [np.ndarray])
def test_alias_g(): def test_alias_g():
return f() return test_alias_f()
@orchpy.distributed([], [np.ndarray])
def test_alias_h():
return test_alias_g()
@orchpy.distributed([str], [str]) @orchpy.distributed([str], [str])
def print_string(string): def print_string(string):