mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
store objects in objstore
This commit is contained in:
parent
5376f14149
commit
0abcf46c56
1 changed files with 12 additions and 15 deletions
|
@ -294,15 +294,18 @@ cdef class Worker:
|
|||
while True:
|
||||
name, args, returnrefs = self.wait_for_next_task()
|
||||
print "got returnref", returnrefs
|
||||
self.functions[name].executor(args, returnrefs)
|
||||
# self.invoke_function(name, args)
|
||||
|
||||
result = self.functions[name].executor(args, returnrefs)
|
||||
if len(returnrefs) == 1:
|
||||
self.put_obj(returnrefs[0], result)
|
||||
else:
|
||||
for i in range(len(returnrefs)):
|
||||
self.put_obj(returnrefs[i], result[i])
|
||||
|
||||
global_worker = Worker()
|
||||
|
||||
def distributed(types, return_type, worker=global_worker):
|
||||
def distributed(types, return_types, worker=global_worker):
|
||||
def distributed_decorator(func):
|
||||
# deserialize arguments, execute function and serialize result
|
||||
# deserialize arguments, execute function and return results
|
||||
def func_executor(args, returnrefs):
|
||||
arguments = []
|
||||
for (i, arg) in enumerate(args):
|
||||
|
@ -323,16 +326,10 @@ def distributed(types, return_type, worker=global_worker):
|
|||
# buf = bytearray()
|
||||
print "called with arguments", arguments
|
||||
result = func(*arguments)
|
||||
# if unison.unison_type(result) != return_type:
|
||||
# raise Exception("Return type of " + func.func_name + " does not match the return type specified in the @distributed decorator, was expecting " + str(return_type) + " but received " + str(unison.unison_type(result)))
|
||||
# unison.serialize(buf, result)
|
||||
# return memoryview(buf).tobytes()
|
||||
# return result
|
||||
# obj = ObjWrapper()
|
||||
# serialize_into_2(result, obj.get_value())
|
||||
# print "put was sucessful? seems like so"
|
||||
for i in range(len(returnrefs)):
|
||||
worker.put_obj(returnrefs[i], result[i])
|
||||
# check number of return values and return types
|
||||
if len(return_types) != 1 and len(result) != len(return_types):
|
||||
raise Exception("The @distributed decorator for function " + func.__name__ + " has " + str(len(return_types)) + " return values with types " + str(return_types) + " but " + func.__name__ + " returned " + str(len(result)) + " values.")
|
||||
return result
|
||||
# for remotely executing the function
|
||||
def func_call(*args, typecheck=False):
|
||||
return worker.call(func_call.func_name, func_call.module_name, args)
|
||||
|
|
Loading…
Add table
Reference in a new issue