mirror of
https://github.com/vale981/ray
synced 2025-03-06 18:41:40 -05:00
parent
6f479d4697
commit
fe4c6ab778
12 changed files with 15 additions and 267 deletions
|
@ -39,18 +39,6 @@ REDIS_MODULE = os.path.join(
|
||||||
os.path.abspath(os.path.dirname(__file__)),
|
os.path.abspath(os.path.dirname(__file__)),
|
||||||
"core/src/ray/gcs/redis_module/libray_redis_module.so")
|
"core/src/ray/gcs/redis_module/libray_redis_module.so")
|
||||||
|
|
||||||
# Location of the credis server and modules.
|
|
||||||
# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set.
|
|
||||||
CREDIS_EXECUTABLE = os.path.join(
|
|
||||||
os.path.abspath(os.path.dirname(__file__)),
|
|
||||||
"core/src/credis/redis/src/redis-server" + EXE_SUFFIX)
|
|
||||||
CREDIS_MASTER_MODULE = os.path.join(
|
|
||||||
os.path.abspath(os.path.dirname(__file__)),
|
|
||||||
"core/src/credis/build/src/libmaster.so")
|
|
||||||
CREDIS_MEMBER_MODULE = os.path.join(
|
|
||||||
os.path.abspath(os.path.dirname(__file__)),
|
|
||||||
"core/src/credis/build/src/libmember.so")
|
|
||||||
|
|
||||||
# Location of the plasma object store executable.
|
# Location of the plasma object store executable.
|
||||||
PLASMA_STORE_EXECUTABLE = os.path.join(
|
PLASMA_STORE_EXECUTABLE = os.path.join(
|
||||||
os.path.abspath(os.path.dirname(__file__)),
|
os.path.abspath(os.path.dirname(__file__)),
|
||||||
|
@ -663,7 +651,6 @@ def start_redis(node_ip_address,
|
||||||
redis_max_clients=None,
|
redis_max_clients=None,
|
||||||
redirect_worker_output=False,
|
redirect_worker_output=False,
|
||||||
password=None,
|
password=None,
|
||||||
use_credis=None,
|
|
||||||
fate_share=None):
|
fate_share=None):
|
||||||
"""Start the Redis global state store.
|
"""Start the Redis global state store.
|
||||||
|
|
||||||
|
@ -686,9 +673,6 @@ def start_redis(node_ip_address,
|
||||||
to this value when they start up.
|
to this value when they start up.
|
||||||
password (str): Prevents external clients without the password
|
password (str): Prevents external clients without the password
|
||||||
from connecting to Redis if provided.
|
from connecting to Redis if provided.
|
||||||
use_credis: If True, additionally load the chain-replicated libraries
|
|
||||||
into the redis servers. Defaults to None, which means its value is
|
|
||||||
set by the presence of "RAY_USE_NEW_GCS" in os.environ.
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A tuple of the address for the primary Redis shard, a list of
|
A tuple of the address for the primary Redis shard, a list of
|
||||||
|
@ -708,29 +692,6 @@ def start_redis(node_ip_address,
|
||||||
|
|
||||||
processes = []
|
processes = []
|
||||||
|
|
||||||
if use_credis is None:
|
|
||||||
use_credis = ("RAY_USE_NEW_GCS" in os.environ)
|
|
||||||
if use_credis:
|
|
||||||
if password is not None:
|
|
||||||
# TODO(pschafhalter) remove this once credis supports
|
|
||||||
# authenticating Redis ports
|
|
||||||
raise ValueError("Setting the `redis_password` argument is not "
|
|
||||||
"supported in credis. To run Ray with "
|
|
||||||
"password-protected Redis ports, ensure that "
|
|
||||||
"the environment variable `RAY_USE_NEW_GCS=off`.")
|
|
||||||
assert num_redis_shards == 1, (
|
|
||||||
"For now, RAY_USE_NEW_GCS supports 1 shard, and credis "
|
|
||||||
"supports 1-node chain for that shard only.")
|
|
||||||
|
|
||||||
if use_credis:
|
|
||||||
redis_executable = CREDIS_EXECUTABLE
|
|
||||||
# TODO(suquark): We need credis here because some symbols need to be
|
|
||||||
# imported from credis dynamically through dlopen when Ray is built
|
|
||||||
# with RAY_USE_NEW_GCS=on. We should remove them later for the primary
|
|
||||||
# shard.
|
|
||||||
# See src/ray/gcs/redis_module/ray_redis_module.cc
|
|
||||||
redis_modules = [CREDIS_MASTER_MODULE, REDIS_MODULE]
|
|
||||||
else:
|
|
||||||
redis_executable = REDIS_EXECUTABLE
|
redis_executable = REDIS_EXECUTABLE
|
||||||
redis_modules = [REDIS_MODULE]
|
redis_modules = [REDIS_MODULE]
|
||||||
|
|
||||||
|
@ -777,13 +738,6 @@ def start_redis(node_ip_address,
|
||||||
redis_shards = []
|
redis_shards = []
|
||||||
for i in range(num_redis_shards):
|
for i in range(num_redis_shards):
|
||||||
redis_stdout_file, redis_stderr_file = redirect_files[i + 1]
|
redis_stdout_file, redis_stderr_file = redirect_files[i + 1]
|
||||||
if use_credis:
|
|
||||||
redis_executable = CREDIS_EXECUTABLE
|
|
||||||
# It is important to load the credis module BEFORE the ray module,
|
|
||||||
# as the latter contains an extern declaration that the former
|
|
||||||
# supplies.
|
|
||||||
redis_modules = [CREDIS_MEMBER_MODULE, REDIS_MODULE]
|
|
||||||
else:
|
|
||||||
redis_executable = REDIS_EXECUTABLE
|
redis_executable = REDIS_EXECUTABLE
|
||||||
redis_modules = [REDIS_MODULE]
|
redis_modules = [REDIS_MODULE]
|
||||||
|
|
||||||
|
@ -804,40 +758,6 @@ def start_redis(node_ip_address,
|
||||||
# Store redis shard information in the primary redis shard.
|
# Store redis shard information in the primary redis shard.
|
||||||
primary_redis_client.rpush("RedisShards", shard_address)
|
primary_redis_client.rpush("RedisShards", shard_address)
|
||||||
|
|
||||||
if use_credis:
|
|
||||||
# Configure the chain state. The way it is intended to work is
|
|
||||||
# the following:
|
|
||||||
#
|
|
||||||
# PRIMARY_SHARD
|
|
||||||
#
|
|
||||||
# SHARD_1 (master replica) -> SHARD_1 (member replica)
|
|
||||||
# -> SHARD_1 (member replica)
|
|
||||||
#
|
|
||||||
# SHARD_2 (master replica) -> SHARD_2 (member replica)
|
|
||||||
# -> SHARD_2 (member replica)
|
|
||||||
# ...
|
|
||||||
#
|
|
||||||
#
|
|
||||||
# If we have credis members in future, their modules should be:
|
|
||||||
# [CREDIS_MEMBER_MODULE, REDIS_MODULE], and they will be initialized by
|
|
||||||
# execute_command("MEMBER.CONNECT_TO_MASTER", node_ip_address, port)
|
|
||||||
#
|
|
||||||
# Currently we have num_redis_shards == 1, so only one chain will be
|
|
||||||
# created, and the chain only contains master.
|
|
||||||
|
|
||||||
# TODO(suquark): Currently, this is not correct because we are
|
|
||||||
# using the master replica as the primary shard. This should be
|
|
||||||
# fixed later. I had tried to fix it but failed because of heartbeat
|
|
||||||
# issues.
|
|
||||||
primary_client = redis.StrictRedis(
|
|
||||||
host=node_ip_address, port=port, password=password)
|
|
||||||
shard_client = redis.StrictRedis(
|
|
||||||
host=node_ip_address, port=redis_shard_port, password=password)
|
|
||||||
primary_client.execute_command("MASTER.ADD", node_ip_address,
|
|
||||||
redis_shard_port)
|
|
||||||
shard_client.execute_command("MEMBER.CONNECT_TO_MASTER",
|
|
||||||
node_ip_address, port)
|
|
||||||
|
|
||||||
return redis_address, redis_shards, processes
|
return redis_address, redis_shards, processes
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -189,9 +189,6 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
|
||||||
ray.get(x_id)
|
ray.get(x_id)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_actor_init_fails(ray_start_cluster_head):
|
def test_actor_init_fails(ray_start_cluster_head):
|
||||||
cluster = ray_start_cluster_head
|
cluster = ray_start_cluster_head
|
||||||
remote_node = cluster.add_node()
|
remote_node = cluster.add_node()
|
||||||
|
@ -347,9 +344,6 @@ def test_distributed_handle(ray_start_cluster_2_nodes):
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skip("This test does not work yet.")
|
@pytest.mark.skip("This test does not work yet.")
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
|
def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes):
|
||||||
cluster = ray_start_cluster_2_nodes
|
cluster = ray_start_cluster_2_nodes
|
||||||
counter, ids = setup_counter_actor(test_checkpoint=True)
|
counter, ids = setup_counter_actor(test_checkpoint=True)
|
||||||
|
|
|
@ -79,9 +79,6 @@ def test_actor_class_methods(ray_start_regular):
|
||||||
assert ray.get(a.g.remote(2)) == 4
|
assert ray.get(a.g.remote(2)) == 4
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Failing with new GCS API on Linux.")
|
|
||||||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||||
def test_actor_gpus(ray_start_cluster):
|
def test_actor_gpus(ray_start_cluster):
|
||||||
cluster = ray_start_cluster
|
cluster = ray_start_cluster
|
||||||
|
|
|
@ -257,9 +257,6 @@ def test_not_logging_to_driver(shutdown_only):
|
||||||
assert len(err_lines) == 0
|
assert len(err_lines) == 0
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="New GCS API doesn't have a Python API yet.")
|
|
||||||
def test_workers(shutdown_only):
|
def test_workers(shutdown_only):
|
||||||
num_workers = 3
|
num_workers = 3
|
||||||
ray.init(num_cpus=num_workers)
|
ray.init(num_cpus=num_workers)
|
||||||
|
|
|
@ -14,9 +14,6 @@ SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
|
||||||
|
|
||||||
# This test checks that when a worker dies in the middle of a get, the plasma
|
# This test checks that when a worker dies in the middle of a get, the plasma
|
||||||
# store and raylet will not die.
|
# store and raylet will not die.
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Not working with new GCS API.")
|
|
||||||
def test_dying_worker_get(ray_start_2_cpus):
|
def test_dying_worker_get(ray_start_2_cpus):
|
||||||
@ray.remote
|
@ray.remote
|
||||||
def sleep_forever(signal):
|
def sleep_forever(signal):
|
||||||
|
@ -65,9 +62,6 @@ def test_dying_worker_get(ray_start_2_cpus):
|
||||||
|
|
||||||
# This test checks that when a driver dies in the middle of a get, the plasma
|
# This test checks that when a driver dies in the middle of a get, the plasma
|
||||||
# store and raylet will not die.
|
# store and raylet will not die.
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Not working with new GCS API.")
|
|
||||||
def test_dying_driver_get(ray_start_regular):
|
def test_dying_driver_get(ray_start_regular):
|
||||||
# Start the Ray processes.
|
# Start the Ray processes.
|
||||||
address_info = ray_start_regular
|
address_info = ray_start_regular
|
||||||
|
@ -109,9 +103,6 @@ ray.get(ray.ObjectRef(ray.utils.hex_to_binary("{}")))
|
||||||
|
|
||||||
# This test checks that when a worker dies in the middle of a wait, the plasma
|
# This test checks that when a worker dies in the middle of a wait, the plasma
|
||||||
# store and raylet will not die.
|
# store and raylet will not die.
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Not working with new GCS API.")
|
|
||||||
def test_dying_worker_wait(ray_start_2_cpus):
|
def test_dying_worker_wait(ray_start_2_cpus):
|
||||||
@ray.remote
|
@ray.remote
|
||||||
def sleep_forever():
|
def sleep_forever():
|
||||||
|
@ -150,9 +141,6 @@ def test_dying_worker_wait(ray_start_2_cpus):
|
||||||
|
|
||||||
# This test checks that when a driver dies in the middle of a wait, the plasma
|
# This test checks that when a driver dies in the middle of a wait, the plasma
|
||||||
# store and raylet will not die.
|
# store and raylet will not die.
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Not working with new GCS API.")
|
|
||||||
def test_dying_driver_wait(ray_start_regular):
|
def test_dying_driver_wait(ray_start_regular):
|
||||||
# Start the Ray processes.
|
# Start the Ray processes.
|
||||||
address_info = ray_start_regular
|
address_info = ray_start_regular
|
||||||
|
@ -193,5 +181,4 @@ ray.wait([ray.ObjectRef(ray.utils.hex_to_binary("{}"))])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import pytest
|
|
||||||
sys.exit(pytest.main(["-v", __file__]))
|
sys.exit(pytest.main(["-v", __file__]))
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -75,9 +74,6 @@ def test_actor_creation_node_failure(ray_start_cluster):
|
||||||
cluster.remove_node(get_other_nodes(cluster, True)[-1])
|
cluster.remove_node(get_other_nodes(cluster, True)[-1])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_driver_lives_sequential(ray_start_regular):
|
def test_driver_lives_sequential(ray_start_regular):
|
||||||
ray.worker._global_node.kill_raylet()
|
ray.worker._global_node.kill_raylet()
|
||||||
ray.worker._global_node.kill_plasma_store()
|
ray.worker._global_node.kill_plasma_store()
|
||||||
|
@ -88,9 +84,6 @@ def test_driver_lives_sequential(ray_start_regular):
|
||||||
# If the driver can reach the tearDown method, then it is still alive.
|
# If the driver can reach the tearDown method, then it is still alive.
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_driver_lives_parallel(ray_start_regular):
|
def test_driver_lives_parallel(ray_start_regular):
|
||||||
all_processes = ray.worker._global_node.all_processes
|
all_processes = ray.worker._global_node.all_processes
|
||||||
|
|
||||||
|
|
|
@ -419,7 +419,6 @@ def test_calling_start_ray_head(call_ray_stop_only):
|
||||||
check_call_ray(["start", "--head", "--redis-max-clients", "100"])
|
check_call_ray(["start", "--head", "--redis-max-clients", "100"])
|
||||||
check_call_ray(["stop"])
|
check_call_ray(["stop"])
|
||||||
|
|
||||||
if "RAY_USE_NEW_GCS" not in os.environ:
|
|
||||||
# Test starting Ray with redis shard ports specified.
|
# Test starting Ray with redis shard ports specified.
|
||||||
check_call_ray(
|
check_call_ray(
|
||||||
["start", "--head", "--redis-shard-ports", "6380,6381,6382"])
|
["start", "--head", "--redis-shard-ports", "6380,6381,6382"])
|
||||||
|
@ -428,8 +427,8 @@ def test_calling_start_ray_head(call_ray_stop_only):
|
||||||
# Test starting Ray with all arguments specified.
|
# Test starting Ray with all arguments specified.
|
||||||
check_call_ray([
|
check_call_ray([
|
||||||
"start", "--head", "--redis-shard-ports", "6380,6381,6382",
|
"start", "--head", "--redis-shard-ports", "6380,6381,6382",
|
||||||
"--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus",
|
"--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0",
|
||||||
"0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}"
|
"--redis-max-clients", "100", "--resources", "{\"Custom\": 1}"
|
||||||
])
|
])
|
||||||
check_call_ray(["stop"])
|
check_call_ray(["stop"])
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
import os
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -124,9 +123,6 @@ def test_actor_creation_node_failure(ray_start_cluster):
|
||||||
cluster.remove_node(get_other_nodes(cluster, True)[-1])
|
cluster.remove_node(get_other_nodes(cluster, True)[-1])
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_driver_lives_sequential(ray_start_regular):
|
def test_driver_lives_sequential(ray_start_regular):
|
||||||
ray.worker._global_node.kill_raylet()
|
ray.worker._global_node.kill_raylet()
|
||||||
ray.worker._global_node.kill_plasma_store()
|
ray.worker._global_node.kill_plasma_store()
|
||||||
|
@ -137,9 +133,6 @@ def test_driver_lives_sequential(ray_start_regular):
|
||||||
# If the driver can reach the tearDown method, then it is still alive.
|
# If the driver can reach the tearDown method, then it is still alive.
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="Hanging with new GCS API.")
|
|
||||||
def test_driver_lives_parallel(ray_start_regular):
|
def test_driver_lives_parallel(ray_start_regular):
|
||||||
all_processes = ray.worker._global_node.all_processes
|
all_processes = ray.worker._global_node.all_processes
|
||||||
|
|
||||||
|
|
|
@ -15,9 +15,6 @@ def password():
|
||||||
|
|
||||||
|
|
||||||
class TestRedisPassword:
|
class TestRedisPassword:
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="New GCS API doesn't support Redis authentication yet.")
|
|
||||||
def test_redis_password(self, password, shutdown_only):
|
def test_redis_password(self, password, shutdown_only):
|
||||||
@ray.remote
|
@ray.remote
|
||||||
def f():
|
def f():
|
||||||
|
@ -42,9 +39,6 @@ class TestRedisPassword:
|
||||||
host=redis_ip, port=redis_port, password=password)
|
host=redis_ip, port=redis_port, password=password)
|
||||||
assert redis_client.ping()
|
assert redis_client.ping()
|
||||||
|
|
||||||
@pytest.mark.skipif(
|
|
||||||
os.environ.get("RAY_USE_NEW_GCS") == "on",
|
|
||||||
reason="New GCS API doesn't support Redis authentication yet.")
|
|
||||||
def test_redis_password_cluster(self, password, shutdown_only):
|
def test_redis_password_cluster(self, password, shutdown_only):
|
||||||
@ray.remote
|
@ray.remote
|
||||||
def f():
|
def f():
|
||||||
|
|
|
@ -98,13 +98,6 @@ optional_ray_files += ray_autoscaler_files
|
||||||
optional_ray_files += ray_project_files
|
optional_ray_files += ray_project_files
|
||||||
optional_ray_files += ray_dashboard_files
|
optional_ray_files += ray_dashboard_files
|
||||||
|
|
||||||
if os.getenv("RAY_USE_NEW_GCS") == "on":
|
|
||||||
ray_files += [
|
|
||||||
"ray/core/src/credis/build/src/libmember.so",
|
|
||||||
"ray/core/src/credis/build/src/libmaster.so",
|
|
||||||
"ray/core/src/credis/redis/src/redis-server" + exe_suffix,
|
|
||||||
]
|
|
||||||
|
|
||||||
# If you're adding dependencies for ray extras, please
|
# If you're adding dependencies for ray extras, please
|
||||||
# also update the matching section of requirements.txt
|
# also update the matching section of requirements.txt
|
||||||
# in this directory
|
# in this directory
|
||||||
|
|
|
@ -1,73 +0,0 @@
|
||||||
// Copyright 2017 The Ray Authors.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
#pragma once
|
|
||||||
|
|
||||||
#include <functional>
|
|
||||||
|
|
||||||
#include "ray/gcs/redis_module/redismodule.h"
|
|
||||||
|
|
||||||
// NOTE(zongheng): this duplicated declaration serves as forward-declaration
|
|
||||||
// only. The implementation is supposed to be linked in from credis. In
|
|
||||||
// principle, we can expose a header from credis and simple include that header.
|
|
||||||
// This is left as future work.
|
|
||||||
//
|
|
||||||
// Concrete definitions from credis (from an example commit):
|
|
||||||
// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/member.cc#L41
|
|
||||||
// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/master.cc#L36
|
|
||||||
|
|
||||||
// Typical usage to make an existing redismodule command chain-compatible:
|
|
||||||
//
|
|
||||||
// extern RedisChainModule module;
|
|
||||||
// int MyCmd_RedisModuleCmd(...) {
|
|
||||||
// return module.Mutate(..., NodeFunc, TailFunc);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// See, for instance, ChainTableAdd_RedisCommand in ray_redis_module.cc.
|
|
||||||
class RedisChainModule {
|
|
||||||
public:
|
|
||||||
// A function that runs on every node in the chain. Type:
|
|
||||||
// (context, argv, argc, (can be nullptr) mutated_key_str) -> int
|
|
||||||
//
|
|
||||||
// (Advanced) The optional fourth arg can be used in the following way:
|
|
||||||
//
|
|
||||||
// RedisModuleString* redis_key_str = nullptr;
|
|
||||||
// node_func(ctx, argv, argc, &redis_key_str);
|
|
||||||
// // "redis_key_str" now points to the RedisModuleString whose contents
|
|
||||||
// // is mutated by "node_func".
|
|
||||||
//
|
|
||||||
// If the fourth arg is passed, NodeFunc *must* fill in the key being mutated.
|
|
||||||
// It is okay for this NodeFunc to call "RM_FreeString(mutated_key_str)" after
|
|
||||||
// assigning the fourth arg, since that call presumably only decrements a ref
|
|
||||||
// count.
|
|
||||||
using NodeFunc = std::function<int(RedisModuleCtx *, RedisModuleString **, int,
|
|
||||||
RedisModuleString **)>;
|
|
||||||
|
|
||||||
// A function that (1) runs only after all NodeFunc's have run, and (2) runs
|
|
||||||
// once on the tail. A typical usage is to publish a write.
|
|
||||||
using TailFunc = std::function<int(RedisModuleCtx *, RedisModuleString **, int)>;
|
|
||||||
|
|
||||||
// TODO(zongheng): document the RM_Reply semantics.
|
|
||||||
|
|
||||||
// Runs "node_func" on every node in the chain; after the tail node has run it
|
|
||||||
// too, finalizes the mutation by running "tail_func".
|
|
||||||
//
|
|
||||||
// If node_func() returns non-zero, it is treated as an error and the entire
|
|
||||||
// update will terminate early, without running subsequent node_func() and the
|
|
||||||
// final tail_func().
|
|
||||||
//
|
|
||||||
// TODO(zongheng): currently only supports 1-node chain.
|
|
||||||
int ChainReplicate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
|
|
||||||
NodeFunc node_func, TailFunc tail_func);
|
|
||||||
};
|
|
|
@ -30,19 +30,6 @@ using ray::rpc::GcsEntry;
|
||||||
using ray::rpc::TablePrefix;
|
using ray::rpc::TablePrefix;
|
||||||
using ray::rpc::TablePubsub;
|
using ray::rpc::TablePubsub;
|
||||||
|
|
||||||
#if RAY_USE_NEW_GCS
|
|
||||||
// Under this flag, ray-project/credis will be loaded. Specifically, via
|
|
||||||
// "path/redis-server --loadmodule <credis module> --loadmodule <current
|
|
||||||
// libray_redis_module>" (dlopen() under the hood) will a definition of "module"
|
|
||||||
// be supplied.
|
|
||||||
//
|
|
||||||
// All commands in this file that depend on "module" must be wrapped by "#if
|
|
||||||
// RAY_USE_NEW_GCS", until we switch to this launch configuration as the
|
|
||||||
// default.
|
|
||||||
#include "ray/gcs/redis_module/chain_module.h"
|
|
||||||
extern RedisChainModule module;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define REPLY_AND_RETURN_IF_FALSE(CONDITION, MESSAGE) \
|
#define REPLY_AND_RETURN_IF_FALSE(CONDITION, MESSAGE) \
|
||||||
if (!(CONDITION)) { \
|
if (!(CONDITION)) { \
|
||||||
RedisModule_ReplyWithError(ctx, (MESSAGE)); \
|
RedisModule_ReplyWithError(ctx, (MESSAGE)); \
|
||||||
|
@ -328,13 +315,6 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||||
return TableAdd_DoPublish(ctx, argv, argc);
|
return TableAdd_DoPublish(ctx, argv, argc);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if RAY_USE_NEW_GCS
|
|
||||||
int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|
||||||
return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite,
|
|
||||||
/*tail_func=*/TableAdd_DoPublish);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
|
int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
|
||||||
RedisModuleString **mutated_key_str) {
|
RedisModuleString **mutated_key_str) {
|
||||||
if (argc < 5 || argc > 6) {
|
if (argc < 5 || argc > 6) {
|
||||||
|
@ -435,15 +415,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
|
||||||
return TableAppend_DoPublish(ctx, argv, argc);
|
return TableAppend_DoPublish(ctx, argv, argc);
|
||||||
}
|
}
|
||||||
|
|
||||||
#if RAY_USE_NEW_GCS
|
|
||||||
int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
|
||||||
int argc) {
|
|
||||||
return module.ChainReplicate(ctx, argv, argc,
|
|
||||||
/*node_func=*/TableAppend_DoWrite,
|
|
||||||
/*tail_func=*/TableAppend_DoPublish);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int Set_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, bool is_add) {
|
int Set_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, bool is_add) {
|
||||||
RedisModuleString *pubsub_channel_str = argv[2];
|
RedisModuleString *pubsub_channel_str = argv[2];
|
||||||
RedisModuleString *id = argv[3];
|
RedisModuleString *id = argv[3];
|
||||||
|
@ -983,10 +954,6 @@ AUTO_MEMORY(TableRequestNotifications_RedisCommand);
|
||||||
AUTO_MEMORY(TableDelete_RedisCommand);
|
AUTO_MEMORY(TableDelete_RedisCommand);
|
||||||
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
|
AUTO_MEMORY(TableCancelNotifications_RedisCommand);
|
||||||
AUTO_MEMORY(DebugString_RedisCommand);
|
AUTO_MEMORY(DebugString_RedisCommand);
|
||||||
#if RAY_USE_NEW_GCS
|
|
||||||
AUTO_MEMORY(ChainTableAdd_RedisCommand);
|
|
||||||
AUTO_MEMORY(ChainTableAppend_RedisCommand);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
||||||
|
@ -1055,19 +1022,6 @@ __declspec(dllexport)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if RAY_USE_NEW_GCS
|
|
||||||
// Chain-enabled commands that depend on ray-project/credis.
|
|
||||||
if (RedisModule_CreateCommand(ctx, "ray.chain.table_add", ChainTableAdd_RedisCommand,
|
|
||||||
"write pubsub", 0, 0, 0) == REDISMODULE_ERR) {
|
|
||||||
return REDISMODULE_ERR;
|
|
||||||
}
|
|
||||||
if (RedisModule_CreateCommand(ctx, "ray.chain.table_append",
|
|
||||||
ChainTableAppend_RedisCommand, "write pubsub", 0, 0,
|
|
||||||
0) == REDISMODULE_ERR) {
|
|
||||||
return REDISMODULE_ERR;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue