mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
Fix crash for Java task's task.argument()
in state. (#4063)
This commit is contained in:
parent
cfc7e2c5a9
commit
7574757391
3 changed files with 21 additions and 11 deletions
|
@ -311,6 +311,7 @@ class GlobalState(object):
|
|||
function_descriptor_list = task.function_descriptor_list()
|
||||
function_descriptor = FunctionDescriptor.from_bytes_list(
|
||||
function_descriptor_list)
|
||||
|
||||
task_spec_info = {
|
||||
"DriverID": task.driver_id().hex(),
|
||||
"TaskID": task.task_id().hex(),
|
||||
|
|
|
@ -12,7 +12,6 @@ from ray.includes.task cimport (
|
|||
SerializeTaskAsString,
|
||||
)
|
||||
|
||||
from ray.utils import _random_string
|
||||
|
||||
cdef class Task:
|
||||
cdef:
|
||||
|
@ -140,17 +139,23 @@ cdef class Task:
|
|||
cdef:
|
||||
CTaskSpecification *task_spec = self.task_spec.get()
|
||||
int64_t num_args = task_spec.NumArgs()
|
||||
int32_t lang = <int32_t>task_spec.GetLanguage()
|
||||
int count
|
||||
arg_list = []
|
||||
for i in range(num_args):
|
||||
count = task_spec.ArgIdCount(i)
|
||||
if count > 0:
|
||||
assert count == 1
|
||||
arg_list.append(ObjectID.from_native(task_spec.ArgId(i, 0)))
|
||||
else:
|
||||
serialized_str = task_spec.ArgVal(i)[:task_spec.ArgValLength(i)]
|
||||
obj = pickle.loads(serialized_str)
|
||||
arg_list.append(obj)
|
||||
|
||||
if lang == <int32_t>LANGUAGE_PYTHON:
|
||||
for i in range(num_args):
|
||||
count = task_spec.ArgIdCount(i)
|
||||
if count > 0:
|
||||
assert count == 1
|
||||
arg_list.append(ObjectID.from_native(task_spec.ArgId(i, 0)))
|
||||
else:
|
||||
serialized_str = task_spec.ArgVal(i)[:task_spec.ArgValLength(i)]
|
||||
obj = pickle.loads(serialized_str)
|
||||
arg_list.append(obj)
|
||||
elif lang == <int32_t>LANGUAGE_JAVA:
|
||||
arg_list = num_args * ["<java-argument>"]
|
||||
|
||||
return arg_list
|
||||
|
||||
def returns(self):
|
||||
|
@ -178,6 +183,10 @@ cdef class Task:
|
|||
postincrement(iterator)
|
||||
return required_resources
|
||||
|
||||
def language(self):
|
||||
"""Return the language of the task."""
|
||||
return Language.from_native(self.task_spec.get().GetLanguage())
|
||||
|
||||
def actor_creation_id(self):
|
||||
"""Return the actor creation ID for the task."""
|
||||
return ActorID.from_native(self.task_spec.get().ActorCreationId())
|
||||
|
|
|
@ -1134,7 +1134,7 @@ def build_java_worker_command(
|
|||
command += "-Dray.raylet.socket-name={} ".format(raylet_name)
|
||||
|
||||
if redis_password is not None:
|
||||
command += ("-Dray.redis-password=%s", redis_password)
|
||||
command += "-Dray.redis.password={} ".format(redis_password)
|
||||
|
||||
command += "-Dray.home={} ".format(RAY_HOME)
|
||||
# TODO(suquark): We should use temp_dir as the input of a java worker.
|
||||
|
|
Loading…
Add table
Reference in a new issue