[2/X][Pipeline] Add python generation for ClassNode (#22617)

- Added backbone of ray dag -> serve dag transformation and deployment extraction.
- Added util functions for deployment unique name generation .. ray_actor_options, replacement of DeploymentNode with deployment handle, etc.
This commit is contained in:
Jiao 2022-02-24 14:01:35 -08:00 committed by GitHub
parent a385c9b127
commit 3c707f70cc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 529 additions and 42 deletions

View file

@ -2,7 +2,10 @@ from ray.experimental.dag.dag_node import DAGNode
from ray.experimental.dag.function_node import FunctionNode
from ray.experimental.dag.class_node import ClassNode, ClassMethodNode
from ray.experimental.dag.input_node import InputNode
from ray.experimental.dag.constants import (
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
)
__all__ = [
"ClassNode",
@ -10,4 +13,6 @@ __all__ = [
"DAGNode",
"FunctionNode",
"InputNode",
"PARENT_CLASS_NODE_KEY",
"PREV_CLASS_METHOD_CALL_KEY",
]

View file

@ -2,6 +2,10 @@ import ray
from ray.experimental.dag.dag_node import DAGNode
from ray.experimental.dag.input_node import InputNode
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.experimental.dag.constants import (
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
)
from typing import Any, Dict, List, Optional, Tuple
@ -85,8 +89,8 @@ class _UnboundClassMethodNode(object):
def _bind(self, *args, **kwargs):
other_args_to_resolve = {
"parent_class_node": self._actor,
"prev_class_method_call": self._actor._last_call,
PARENT_CLASS_NODE_KEY: self._actor,
PREV_CLASS_METHOD_CALL_KEY: self._actor._last_call,
}
node = ClassMethodNode(
@ -122,13 +126,13 @@ class ClassMethodNode(DAGNode):
self._method_name: str = method_name
# Parse other_args_to_resolve and assign to variables
self._parent_class_node: ClassNode = other_args_to_resolve.get(
"parent_class_node"
PARENT_CLASS_NODE_KEY
)
# Used to track lineage of ClassMethodCall to preserve deterministic
# submission and execution order.
self._prev_class_method_call: Optional[
ClassMethodNode
] = other_args_to_resolve.get("prev_class_method_call", None)
] = other_args_to_resolve.get(PREV_CLASS_METHOD_CALL_KEY, None)
# The actor creation task dependency is encoded as the first argument,
# and the ordering dependency as the second, which ensures they are
# executed prior to this node.

View file

@ -0,0 +1,3 @@
# Reserved keys used to handle ClassMethodNode in Ray DAG building.
PARENT_CLASS_NODE_KEY = "parent_class_node"
PREV_CLASS_METHOD_CALL_KEY = "prev_class_method_call"

View file

@ -2,7 +2,11 @@ import pytest
import pickle
import ray
from ray.experimental.dag import DAGNode
from ray.experimental.dag import (
DAGNode,
PARENT_CLASS_NODE_KEY,
PREV_CLASS_METHOD_CALL_KEY,
)
@ray.remote
@ -147,36 +151,36 @@ def test_actor_options_complicated(shared_ray_instance):
assert test_a2.get_options() == {} # No .options() at outer call
# refer to a2 constructor .options() call
assert (
test_a2.get_other_args_to_resolve()["parent_class_node"]
test_a2.get_other_args_to_resolve()[PARENT_CLASS_NODE_KEY]
.get_options()
.get("name")
== "a2_v0"
)
# refer to actor method a2.inc.options() call
assert (
test_a2.get_other_args_to_resolve()["prev_class_method_call"]
test_a2.get_other_args_to_resolve()[PREV_CLASS_METHOD_CALL_KEY]
.get_options()
.get("name")
== "v3"
)
# refer to a1 constructor .options() call
assert (
test_a1.get_other_args_to_resolve()["parent_class_node"]
test_a1.get_other_args_to_resolve()[PARENT_CLASS_NODE_KEY]
.get_options()
.get("name")
== "a1_v1"
)
# refer to latest actor method a1.inc.options() call
assert (
test_a1.get_other_args_to_resolve()["prev_class_method_call"]
test_a1.get_other_args_to_resolve()[PREV_CLASS_METHOD_CALL_KEY]
.get_options()
.get("name")
== "v2"
)
# refer to first bound actor method a1.inc.options() call
assert (
test_a1.get_other_args_to_resolve()["prev_class_method_call"]
.get_other_args_to_resolve()["prev_class_method_call"]
test_a1.get_other_args_to_resolve()[PREV_CLASS_METHOD_CALL_KEY]
.get_other_args_to_resolve()[PREV_CLASS_METHOD_CALL_KEY]
.get_options()
.get("name")
== "v1"

View file

@ -0,0 +1,5 @@
# Reserved constant used as key in other_args_to_resolve to configure if we
# return sync or async handle of a deployment.
# True -> RayServeSyncHandle
# False -> RayServeHandle
USE_SYNC_HANDLE_KEY = "use_sync_handle"

View file

@ -1,9 +1,10 @@
from typing import Any, Dict, Optional, Tuple, List
from typing import Any, Dict, Optional, Tuple, List, Union
from ray.experimental.dag import DAGNode
from ray.experimental.dag.format_utils import get_dag_node_str
from ray.serve.api import Deployment
from ray.serve.handle import RayServeSyncHandle, RayServeHandle
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
class DeploymentMethodNode(DAGNode):
@ -26,16 +27,9 @@ class DeploymentMethodNode(DAGNode):
method_options,
other_args_to_resolve=other_args_to_resolve,
)
# Serve handle is sync by default.
if (
"sync_handle" in self._bound_other_args_to_resolve
and self._bound_other_args_to_resolve.get("sync_handle") is True
):
self._deployment_handle: RayServeSyncHandle = deployment.get_handle(
sync=True
)
else:
self._deployment_handle: RayServeHandle = deployment.get_handle(sync=False)
self._deployment_handle: Union[
RayServeHandle, RayServeSyncHandle
] = self._get_serve_deployment_handle(deployment, other_args_to_resolve)
def _copy_impl(
self,
@ -63,6 +57,39 @@ class DeploymentMethodNode(DAGNode):
**self._bound_kwargs,
)
def _get_serve_deployment_handle(
self,
deployment: Deployment,
bound_other_args_to_resolve: Dict[str, Any],
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""
Return a sync or async handle of the encapsulated Deployment based on
config.
Args:
deployment (Deployment): Deployment instance wrapped in the DAGNode.
bound_other_args_to_resolve (Dict[str, Any]): Contains args used
to configure DeploymentNode.
Returns:
RayServeHandle: Default and catch-all is to return sync handle.
return async handle only if user explicitly set
USE_SYNC_HANDLE_KEY with value of False.
"""
if USE_SYNC_HANDLE_KEY not in bound_other_args_to_resolve:
# Return sync RayServeSyncHandle
return deployment.get_handle(sync=True)
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is True:
# Return sync RayServeSyncHandle
return deployment.get_handle(sync=True)
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is False:
# Return async RayServeHandle
return deployment.get_handle(sync=False)
else:
raise ValueError(
f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value."
)
def __str__(self) -> str:
return get_dag_node_str(
self, str(self._method_name) + "() @ " + str(self._body)

View file

@ -1,9 +1,10 @@
from typing import Any, Dict, Optional, List, Tuple
from typing import Any, Dict, Optional, List, Tuple, Union
from ray.experimental.dag import DAGNode, InputNode
from ray.serve.api import Deployment
from ray.serve.handle import RayServeSyncHandle, RayServeHandle
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
from ray.experimental.dag.format_utils import get_dag_node_str
@ -25,16 +26,9 @@ class DeploymentNode(DAGNode):
cls_options,
other_args_to_resolve=other_args_to_resolve,
)
# Serve handle is sync by default.
if (
"sync_handle" in self._bound_other_args_to_resolve
and self._bound_other_args_to_resolve.get("sync_handle") is True
):
self._deployment_handle: RayServeSyncHandle = deployment.get_handle(
sync=True
)
else:
self._deployment_handle: RayServeHandle = deployment.get_handle(sync=False)
self._deployment_handle: Union[
RayServeHandle, RayServeSyncHandle
] = self._get_serve_deployment_handle(deployment, other_args_to_resolve)
if self._contains_input_node():
raise ValueError(
@ -65,6 +59,39 @@ class DeploymentNode(DAGNode):
*self._bound_args, **self._bound_kwargs
)
def _get_serve_deployment_handle(
self,
deployment: Deployment,
bound_other_args_to_resolve: Dict[str, Any],
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""
Return a sync or async handle of the encapsulated Deployment based on
config.
Args:
deployment (Deployment): Deployment instance wrapped in the DAGNode.
bound_other_args_to_resolve (Dict[str, Any]): Contains args used
to configure DeploymentNode.
Returns:
RayServeHandle: Default and catch-all is to return sync handle.
return async handle only if user explicitly set
USE_SYNC_HANDLE_KEY with value of False.
"""
if USE_SYNC_HANDLE_KEY not in bound_other_args_to_resolve:
# Return sync RayServeSyncHandle
return deployment.get_handle(sync=True)
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is True:
# Return sync RayServeSyncHandle
return deployment.get_handle(sync=True)
elif bound_other_args_to_resolve.get(USE_SYNC_HANDLE_KEY) is False:
# Return async RayServeHandle
return deployment.get_handle(sync=False)
else:
raise ValueError(
f"{USE_SYNC_HANDLE_KEY} should only be set with a boolean value."
)
def _contains_input_node(self) -> bool:
"""Check if InputNode is used in children DAGNodes with current node
as the root.

View file

@ -0,0 +1,164 @@
from typing import Any, Dict, List, Tuple
import threading
from ray.experimental.dag import (
DAGNode,
ClassNode,
ClassMethodNode,
PARENT_CLASS_NODE_KEY,
)
from ray.serve.api import Deployment, DeploymentConfig
from ray.serve.pipeline.deployment_method_node import DeploymentMethodNode
from ray.serve.pipeline.deployment_node import DeploymentNode
class DeploymentNameGenerator(object):
"""
Generate unique suffix for each given deployment_name requested for name.
By default uses deployment_name for the very first time, then append
monotonic increasing id to it.
"""
__lock = threading.Lock()
__shared_state = dict()
@classmethod
def get_deployment_name(cls, dag_node: ClassNode):
assert isinstance(
dag_node, ClassNode
), "get_deployment_name() should only be called on ClassNode instances."
with cls.__lock:
deployment_name = (
dag_node.get_options().get("name", None) or dag_node._body.__name__
)
if deployment_name not in cls.__shared_state:
cls.__shared_state[deployment_name] = 0
return deployment_name
else:
cls.__shared_state[deployment_name] += 1
suffix_num = cls.__shared_state[deployment_name]
return f"{deployment_name}_{suffix_num}"
def _remove_non_default_ray_actor_options(ray_actor_options: Dict[str, Any]):
"""
In Ray DAG building we pass full ray_actor_options regardless if a field
was explicitly set. Since some values are invalid, we need to remove them
from ray_actor_options.
"""
# TODO: (jiaodong) Revisit when we implement build() when user explicitly
# pass default value
ray_actor_options = {k: v for k, v in ray_actor_options.items() if v}
if ray_actor_options.get("placement_group") == "default":
del ray_actor_options["placement_group"]
if ray_actor_options.get("placement_group_bundle_index") == -1:
del ray_actor_options["placement_group_bundle_index"]
if ray_actor_options.get("max_pending_calls") == -1:
del ray_actor_options["max_pending_calls"]
return ray_actor_options
def _replace_init_args_with_deployment_handle(
args: List[Any], kwargs: Dict[str, Any]
) -> Tuple[Tuple[Any], Dict[str, Any]]:
"""
Deployment can be passed into other DAGNodes as init args. This is supported
pattern in ray DAG. Thus we need convert them into deployment handles in
ray serve DAG to make end to end DAG executable.
"""
init_args = []
init_kwargs = {}
# TODO: (jiaodong) Need to handle deeply nested deployment in args
for arg in args:
if isinstance(arg, DeploymentNode):
init_args.append(arg._deployment_handle)
else:
init_args.append(arg)
for key, value in kwargs.items():
if isinstance(value, DeploymentNode):
init_kwargs[key] = value._deployment_handle
else:
init_kwargs[key] = value
return tuple(init_args), init_kwargs
def transform_ray_dag_to_serve_dag(dag_node):
"""
Transform a Ray DAG to a Serve DAG. Map ClassNode to DeploymentNode with
ray decorated body passed in, ans ClassMethodNode to DeploymentMethodNode.
"""
if isinstance(dag_node, ClassNode):
deployment_name = DeploymentNameGenerator.get_deployment_name(dag_node)
ray_actor_options = _remove_non_default_ray_actor_options(
dag_node.get_options()
)
init_args, init_kwargs = _replace_init_args_with_deployment_handle(
dag_node.get_args(), dag_node.get_kwargs()
)
# Deployment class cannot bind with DeploymentNode
new_deployment = Deployment(
dag_node._body,
deployment_name,
DeploymentConfig(),
init_args=init_args,
init_kwargs=init_kwargs,
ray_actor_options=ray_actor_options,
_internal=True,
)
deployment_node = DeploymentNode(
new_deployment,
dag_node.get_args(),
dag_node.get_kwargs(),
# TODO: (jiaodong) Support .options(metadata=xxx) for deployment
{}, # Deployment options are not ray actor options.
other_args_to_resolve=dag_node.get_other_args_to_resolve(),
)
return deployment_node
elif isinstance(dag_node, ClassMethodNode):
other_args_to_resolve = dag_node.get_other_args_to_resolve()
# TODO: (jiaodong) Need to capture DAGNodes in the parent node
parent_deployment_node = other_args_to_resolve[PARENT_CLASS_NODE_KEY]
deployment_method_node = DeploymentMethodNode(
parent_deployment_node._body,
dag_node._method_name,
dag_node.get_args(),
dag_node.get_kwargs(),
dag_node.get_options(),
other_args_to_resolve=dag_node.get_other_args_to_resolve(),
)
return deployment_method_node
else:
# TODO: (jiaodong) Support FunctionNode
return dag_node
def extract_deployments_from_serve_dag(
serve_dag_root: DAGNode,
) -> List[Deployment]:
"""Extract deployment python objects from a transformed serve DAG. Should
only be called after `transform_ray_dag_to_serve_dag`, otherwise nothing
to return.
Args:
serve_dag_root (DAGNode): Transformed serve dag root node.
Returns:
List[Deployment]: List of deployment python objects fetched from serve
dag.
"""
deployments = {}
def extractor(dag_node):
if isinstance(dag_node, DeploymentNode):
deployment = dag_node._body
# In case same deployment is used in multiple DAGNodes
deployments[deployment.name] = deployment
serve_dag_root._apply_recursive(extractor)
return list(deployments.values())

View file

@ -5,7 +5,11 @@ from ray import serve
from ray.experimental.dag.input_node import InputNode
from ray.serve.api import Deployment
from ray.serve.config import DeploymentConfig
from ray.serve.pipeline.deployment_node import DeploymentNode
from ray.serve.pipeline.deployment_node import (
DeploymentNode,
DeploymentMethodNode,
)
from ray.serve.pipeline.constants import USE_SYNC_HANDLE_KEY
@serve.deployment
@ -42,7 +46,7 @@ class Actor:
return self.i
def test_disallow_binding_deployments(serve_instance):
def test_disallow_binding_deployments():
with pytest.raises(
AttributeError,
match="DAG building API should only be used for @ray.remote decorated",
@ -69,7 +73,7 @@ async def test_simple_deployment_async(serve_instance):
[],
{},
{},
other_args_to_resolve={"sync_handle": False},
other_args_to_resolve={USE_SYNC_HANDLE_KEY: False},
)
deployment.deploy()
handle = deployment.get_handle(sync=False)
@ -98,7 +102,7 @@ def test_simple_deployment_sync(serve_instance):
[],
{},
{},
other_args_to_resolve={"sync_handle": True},
other_args_to_resolve={USE_SYNC_HANDLE_KEY: True},
)
deployment.deploy()
handle = deployment.get_handle(sync=True)
@ -109,7 +113,7 @@ def test_simple_deployment_sync(serve_instance):
assert ray.get(node.get.execute()) == ray.get(handle.get.remote())
def test_no_input_node_as_init_args(serve_instance):
def test_no_input_node_as_init_args():
"""
User should NOT directly create instances of Deployment or DeploymentNode.
"""
@ -128,7 +132,7 @@ def test_no_input_node_as_init_args(serve_instance):
[InputNode()],
{},
{},
other_args_to_resolve={"sync_handle": True},
other_args_to_resolve={USE_SYNC_HANDLE_KEY: True},
)
with pytest.raises(
ValueError,
@ -139,8 +143,9 @@ def test_no_input_node_as_init_args(serve_instance):
[],
{"a": InputNode()},
{},
other_args_to_resolve={"sync_handle": True},
other_args_to_resolve={USE_SYNC_HANDLE_KEY: True},
)
with pytest.raises(
ValueError,
match="cannot be used as args, kwargs, or other_args_to_resolve",
@ -150,7 +155,41 @@ def test_no_input_node_as_init_args(serve_instance):
[],
{},
{},
other_args_to_resolve={"sync_handle": {"options_a": InputNode()}},
other_args_to_resolve={"arg": {"options_a": InputNode()}},
)
def test_invalid_use_sync_handle():
deployment = Deployment(
Actor,
"test",
DeploymentConfig(),
_internal=True,
)
with pytest.raises(
ValueError,
match="use_sync_handle should only be set with a boolean value",
):
_ = DeploymentNode(
deployment,
[],
{},
{},
other_args_to_resolve={USE_SYNC_HANDLE_KEY: {"options_a": InputNode()}},
)
with pytest.raises(
ValueError,
match="use_sync_handle should only be set with a boolean value",
):
_ = DeploymentMethodNode(
deployment,
"method",
[],
{},
{},
other_args_to_resolve={
USE_SYNC_HANDLE_KEY: None,
},
)

View file

@ -0,0 +1,153 @@
import pytest
import ray
from ray import serve
from ray.experimental.dag import InputNode
from ray.serve.pipeline.generate import (
transform_ray_dag_to_serve_dag,
extract_deployments_from_serve_dag,
)
from ray.serve.pipeline.tests.test_modules import Model, Combine
def _validate_consistent_output(
deployment, dag, handle_by_name, input=None, output=None
):
"""Assert same input lead to same outputs across the following:
1) Deployment handle returned from Deployment instance get_handle()
2) Original executable Ray DAG
3) Deployment handle return from serve public API get_deployment()
"""
deployment_handle = deployment.get_handle()
assert ray.get(deployment_handle.remote(input)) == output
assert ray.get(dag.execute(input)) == output
handle_by_name = serve.get_deployment(handle_by_name).get_handle()
assert ray.get(handle_by_name.remote(input)) == output
def test_simple_single_class(serve_instance):
# Assert converting both arg and kwarg
model = Model._bind(2, ratio=0.3)
ray_dag = model.forward._bind(InputNode())
serve_root_dag = ray_dag._apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
deployments[0].deploy()
_validate_consistent_output(deployments[0], ray_dag, "Model", input=1, output=0.6)
def test_single_class_with_valid_ray_options(serve_instance):
model = Model.options(num_cpus=1, memory=1000)._bind(2, ratio=0.3)
ray_dag = model.forward._bind(InputNode())
serve_root_dag = ray_dag._apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
deployments[0].deploy()
_validate_consistent_output(
deployments[0], ray_dag, deployments[0].name, input=1, output=0.6
)
deployment = serve.get_deployment(deployments[0].name)
assert deployment.ray_actor_options == {
"num_cpus": 1,
"memory": 1000,
"runtime_env": {},
}
def test_single_class_with_invalid_deployment_options(serve_instance):
model = Model.options(name="my_deployment")._bind(2, ratio=0.3)
ray_dag = model.forward._bind(InputNode())
serve_root_dag = ray_dag._apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node)
)
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 1
with pytest.raises(
ValueError, match="Specifying name in ray_actor_options is not allowed"
):
deployments[0].deploy()
def test_multi_instantiation_class_deployment_in_init_args(serve_instance):
"""
Test we can pass deployments as init_arg or init_kwarg, instantiated
multiple times for the same class, and we can still correctly replace
args with deployment handle and parse correct deployment instances.
"""
m1 = Model._bind(2)
m2 = Model._bind(3)
combine = Combine._bind(m1, m2=m2)
ray_dag = combine.__call__._bind(InputNode())
print(f"Ray DAG: \n{ray_dag}")
serve_root_dag = ray_dag._apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node)
)
print(f"Serve DAG: \n{serve_root_dag}")
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 3
for deployment in deployments:
deployment.deploy()
_validate_consistent_output(deployments[2], ray_dag, "Combine", input=1, output=5)
def test_shared_deployment_handle(serve_instance):
"""
Test we can re-use the same deployment handle multiple times or in
multiple places, without incorrectly parsing duplicated deployments.
"""
m = Model._bind(2)
combine = Combine._bind(m, m2=m)
ray_dag = combine.__call__._bind(InputNode())
print(f"Ray DAG: \n{ray_dag}")
serve_root_dag = ray_dag._apply_recursive(
lambda node: transform_ray_dag_to_serve_dag(node)
)
print(f"Serve DAG: \n{serve_root_dag}")
deployments = extract_deployments_from_serve_dag(serve_root_dag)
assert len(deployments) == 2
for deployment in deployments:
deployment.deploy()
_validate_consistent_output(deployments[1], ray_dag, "Combine", input=1, output=4)
def test_multi_instantiation_class_nested_deployment_arg(serve_instance):
"""
Test we can pass deployments with **nested** init_arg or init_kwarg,
instantiated multiple times for the same class, and we can still correctly
replace args with deployment handle and parse correct deployment instances.
"""
# TODO: (jiaodong) Support nested deployment args
pass
def test_simple_function(serve_instance):
# TODO: (jiaodong) Support function deployment node
pass
def test_multiple_functions(serve_instance):
# TODO: (jiaodong) Support function deployment node
pass
def test_mix_class_and_function(serve_instance):
# TODO: (jiaodong) Support function deployment node
pass
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-v", __file__]))

View file

@ -0,0 +1,56 @@
"""
Ray decorated classes and functions defined at top of file, importable with
fully qualified name as import_path to test DAG building, artifact generation
and structured deployment.
"""
import ray
from typing import TypeVar
RayHandleLike = TypeVar("RayHandleLike")
@ray.remote
class ClassHello:
def __init__(self):
pass
def hello(self):
return "hello"
@ray.remote
class Model:
def __init__(self, weight: int, ratio: float = None):
self.weight = weight
self.ratio = ratio or 1
def forward(self, input: int):
print(f"forward() recevied input: {input}")
return self.ratio * self.weight * input
def __call__(self, request):
print(f"__call__() recevied request: {request}")
input_data = request
return self.ratio * self.weight * input_data
@ray.remote
class Combine:
def __init__(self, m1: "RayHandleLike", m2: "RayHandleLike" = None):
self.m1 = m1
self.m2 = m2
def __call__(self, req):
r1_ref = self.m1.forward.remote(req)
r2_ref = self.m2.forward.remote(req)
return sum(ray.get([r1_ref, r2_ref]))
@ray.remote
def fn_hello():
return "hello"
@ray.remote
def combine(m1_output, m2_output):
return m1_output + m2_output