diff --git a/python/ray/services.py b/python/ray/services.py index 997de3256..9ce6a6e66 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -39,18 +39,6 @@ REDIS_MODULE = os.path.join( os.path.abspath(os.path.dirname(__file__)), "core/src/ray/gcs/redis_module/libray_redis_module.so") -# Location of the credis server and modules. -# credis will be enabled if the environment variable RAY_USE_NEW_GCS is set. -CREDIS_EXECUTABLE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - "core/src/credis/redis/src/redis-server" + EXE_SUFFIX) -CREDIS_MASTER_MODULE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - "core/src/credis/build/src/libmaster.so") -CREDIS_MEMBER_MODULE = os.path.join( - os.path.abspath(os.path.dirname(__file__)), - "core/src/credis/build/src/libmember.so") - # Location of the plasma object store executable. PLASMA_STORE_EXECUTABLE = os.path.join( os.path.abspath(os.path.dirname(__file__)), @@ -663,7 +651,6 @@ def start_redis(node_ip_address, redis_max_clients=None, redirect_worker_output=False, password=None, - use_credis=None, fate_share=None): """Start the Redis global state store. @@ -686,9 +673,6 @@ def start_redis(node_ip_address, to this value when they start up. password (str): Prevents external clients without the password from connecting to Redis if provided. - use_credis: If True, additionally load the chain-replicated libraries - into the redis servers. Defaults to None, which means its value is - set by the presence of "RAY_USE_NEW_GCS" in os.environ. Returns: A tuple of the address for the primary Redis shard, a list of @@ -708,31 +692,8 @@ def start_redis(node_ip_address, processes = [] - if use_credis is None: - use_credis = ("RAY_USE_NEW_GCS" in os.environ) - if use_credis: - if password is not None: - # TODO(pschafhalter) remove this once credis supports - # authenticating Redis ports - raise ValueError("Setting the `redis_password` argument is not " - "supported in credis. To run Ray with " - "password-protected Redis ports, ensure that " - "the environment variable `RAY_USE_NEW_GCS=off`.") - assert num_redis_shards == 1, ( - "For now, RAY_USE_NEW_GCS supports 1 shard, and credis " - "supports 1-node chain for that shard only.") - - if use_credis: - redis_executable = CREDIS_EXECUTABLE - # TODO(suquark): We need credis here because some symbols need to be - # imported from credis dynamically through dlopen when Ray is built - # with RAY_USE_NEW_GCS=on. We should remove them later for the primary - # shard. - # See src/ray/gcs/redis_module/ray_redis_module.cc - redis_modules = [CREDIS_MASTER_MODULE, REDIS_MODULE] - else: - redis_executable = REDIS_EXECUTABLE - redis_modules = [REDIS_MODULE] + redis_executable = REDIS_EXECUTABLE + redis_modules = [REDIS_MODULE] redis_stdout_file, redis_stderr_file = redirect_files[0] # Start the primary Redis shard. @@ -777,15 +738,8 @@ def start_redis(node_ip_address, redis_shards = [] for i in range(num_redis_shards): redis_stdout_file, redis_stderr_file = redirect_files[i + 1] - if use_credis: - redis_executable = CREDIS_EXECUTABLE - # It is important to load the credis module BEFORE the ray module, - # as the latter contains an extern declaration that the former - # supplies. - redis_modules = [CREDIS_MEMBER_MODULE, REDIS_MODULE] - else: - redis_executable = REDIS_EXECUTABLE - redis_modules = [REDIS_MODULE] + redis_executable = REDIS_EXECUTABLE + redis_modules = [REDIS_MODULE] redis_shard_port, p = _start_redis_instance( redis_executable, @@ -804,40 +758,6 @@ def start_redis(node_ip_address, # Store redis shard information in the primary redis shard. primary_redis_client.rpush("RedisShards", shard_address) - if use_credis: - # Configure the chain state. The way it is intended to work is - # the following: - # - # PRIMARY_SHARD - # - # SHARD_1 (master replica) -> SHARD_1 (member replica) - # -> SHARD_1 (member replica) - # - # SHARD_2 (master replica) -> SHARD_2 (member replica) - # -> SHARD_2 (member replica) - # ... - # - # - # If we have credis members in future, their modules should be: - # [CREDIS_MEMBER_MODULE, REDIS_MODULE], and they will be initialized by - # execute_command("MEMBER.CONNECT_TO_MASTER", node_ip_address, port) - # - # Currently we have num_redis_shards == 1, so only one chain will be - # created, and the chain only contains master. - - # TODO(suquark): Currently, this is not correct because we are - # using the master replica as the primary shard. This should be - # fixed later. I had tried to fix it but failed because of heartbeat - # issues. - primary_client = redis.StrictRedis( - host=node_ip_address, port=port, password=password) - shard_client = redis.StrictRedis( - host=node_ip_address, port=redis_shard_port, password=password) - primary_client.execute_command("MASTER.ADD", node_ip_address, - redis_shard_port) - shard_client.execute_command("MEMBER.CONNECT_TO_MASTER", - node_ip_address, port) - return redis_address, redis_shards, processes diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index 0e018e3ab..cdffe6afa 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -189,9 +189,6 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head): ray.get(x_id) -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_actor_init_fails(ray_start_cluster_head): cluster = ray_start_cluster_head remote_node = cluster.add_node() @@ -347,9 +344,6 @@ def test_distributed_handle(ray_start_cluster_2_nodes): @pytest.mark.skip("This test does not work yet.") -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_remote_checkpoint_distributed_handle(ray_start_cluster_2_nodes): cluster = ray_start_cluster_2_nodes counter, ids = setup_counter_actor(test_checkpoint=True) diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 66fb5fe09..f0c254c5f 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -79,9 +79,6 @@ def test_actor_class_methods(ray_start_regular): assert ray.get(a.g.remote(2)) == 4 -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Failing with new GCS API on Linux.") @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") def test_actor_gpus(ray_start_cluster): cluster = ray_start_cluster diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 982dd96aa..9eb0308d0 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -257,9 +257,6 @@ def test_not_logging_to_driver(shutdown_only): assert len(err_lines) == 0 -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="New GCS API doesn't have a Python API yet.") def test_workers(shutdown_only): num_workers = 3 ray.init(num_cpus=num_workers) diff --git a/python/ray/tests/test_component_failures.py b/python/ray/tests/test_component_failures.py index 601f12fa4..884f77edb 100644 --- a/python/ray/tests/test_component_failures.py +++ b/python/ray/tests/test_component_failures.py @@ -14,9 +14,6 @@ SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM # This test checks that when a worker dies in the middle of a get, the plasma # store and raylet will not die. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Not working with new GCS API.") def test_dying_worker_get(ray_start_2_cpus): @ray.remote def sleep_forever(signal): @@ -65,9 +62,6 @@ def test_dying_worker_get(ray_start_2_cpus): # This test checks that when a driver dies in the middle of a get, the plasma # store and raylet will not die. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Not working with new GCS API.") def test_dying_driver_get(ray_start_regular): # Start the Ray processes. address_info = ray_start_regular @@ -109,9 +103,6 @@ ray.get(ray.ObjectRef(ray.utils.hex_to_binary("{}"))) # This test checks that when a worker dies in the middle of a wait, the plasma # store and raylet will not die. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Not working with new GCS API.") def test_dying_worker_wait(ray_start_2_cpus): @ray.remote def sleep_forever(): @@ -150,9 +141,6 @@ def test_dying_worker_wait(ray_start_2_cpus): # This test checks that when a driver dies in the middle of a wait, the plasma # store and raylet will not die. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Not working with new GCS API.") def test_dying_driver_wait(ray_start_regular): # Start the Ray processes. address_info = ray_start_regular @@ -193,5 +181,4 @@ ray.wait([ray.ObjectRef(ray.utils.hex_to_binary("{}"))]) if __name__ == "__main__": - import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_component_failures_3.py b/python/ray/tests/test_component_failures_3.py index b910f22d3..ae6c21fe4 100644 --- a/python/ray/tests/test_component_failures_3.py +++ b/python/ray/tests/test_component_failures_3.py @@ -1,4 +1,3 @@ -import os import sys import time @@ -75,9 +74,6 @@ def test_actor_creation_node_failure(ray_start_cluster): cluster.remove_node(get_other_nodes(cluster, True)[-1]) -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_raylet() ray.worker._global_node.kill_plasma_store() @@ -88,9 +84,6 @@ def test_driver_lives_sequential(ray_start_regular): # If the driver can reach the tearDown method, then it is still alive. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 4247e340f..7b168dc46 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -419,19 +419,18 @@ def test_calling_start_ray_head(call_ray_stop_only): check_call_ray(["start", "--head", "--redis-max-clients", "100"]) check_call_ray(["stop"]) - if "RAY_USE_NEW_GCS" not in os.environ: - # Test starting Ray with redis shard ports specified. - check_call_ray( - ["start", "--head", "--redis-shard-ports", "6380,6381,6382"]) - check_call_ray(["stop"]) + # Test starting Ray with redis shard ports specified. + check_call_ray( + ["start", "--head", "--redis-shard-ports", "6380,6381,6382"]) + check_call_ray(["stop"]) - # Test starting Ray with all arguments specified. - check_call_ray([ - "start", "--head", "--redis-shard-ports", "6380,6381,6382", - "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", - "0", "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" - ]) - check_call_ray(["stop"]) + # Test starting Ray with all arguments specified. + check_call_ray([ + "start", "--head", "--redis-shard-ports", "6380,6381,6382", + "--object-manager-port", "12345", "--num-cpus", "2", "--num-gpus", "0", + "--redis-max-clients", "100", "--resources", "{\"Custom\": 1}" + ]) + check_call_ray(["stop"]) # Test starting Ray with invalid arguments. with pytest.raises(subprocess.CalledProcessError): diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index 6de892d36..be855306c 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -1,4 +1,3 @@ -import os import sys import time @@ -124,9 +123,6 @@ def test_actor_creation_node_failure(ray_start_cluster): cluster.remove_node(get_other_nodes(cluster, True)[-1]) -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_driver_lives_sequential(ray_start_regular): ray.worker._global_node.kill_raylet() ray.worker._global_node.kill_plasma_store() @@ -137,9 +133,6 @@ def test_driver_lives_sequential(ray_start_regular): # If the driver can reach the tearDown method, then it is still alive. -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="Hanging with new GCS API.") def test_driver_lives_parallel(ray_start_regular): all_processes = ray.worker._global_node.all_processes diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 2b9b66f06..ad9f96ca2 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -15,9 +15,6 @@ def password(): class TestRedisPassword: - @pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="New GCS API doesn't support Redis authentication yet.") def test_redis_password(self, password, shutdown_only): @ray.remote def f(): @@ -42,9 +39,6 @@ class TestRedisPassword: host=redis_ip, port=redis_port, password=password) assert redis_client.ping() - @pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="New GCS API doesn't support Redis authentication yet.") def test_redis_password_cluster(self, password, shutdown_only): @ray.remote def f(): diff --git a/python/setup.py b/python/setup.py index 89cede854..75f1945bc 100644 --- a/python/setup.py +++ b/python/setup.py @@ -98,13 +98,6 @@ optional_ray_files += ray_autoscaler_files optional_ray_files += ray_project_files optional_ray_files += ray_dashboard_files -if os.getenv("RAY_USE_NEW_GCS") == "on": - ray_files += [ - "ray/core/src/credis/build/src/libmember.so", - "ray/core/src/credis/build/src/libmaster.so", - "ray/core/src/credis/redis/src/redis-server" + exe_suffix, - ] - # If you're adding dependencies for ray extras, please # also update the matching section of requirements.txt # in this directory diff --git a/src/ray/gcs/redis_module/chain_module.h b/src/ray/gcs/redis_module/chain_module.h deleted file mode 100644 index 574d2325e..000000000 --- a/src/ray/gcs/redis_module/chain_module.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2017 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include "ray/gcs/redis_module/redismodule.h" - -// NOTE(zongheng): this duplicated declaration serves as forward-declaration -// only. The implementation is supposed to be linked in from credis. In -// principle, we can expose a header from credis and simple include that header. -// This is left as future work. -// -// Concrete definitions from credis (from an example commit): -// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/member.cc#L41 -// https://github.com/ray-project/credis/blob/7eae7f2e58d16dfa1a95b5dfab02549f54b94e5d/src/master.cc#L36 - -// Typical usage to make an existing redismodule command chain-compatible: -// -// extern RedisChainModule module; -// int MyCmd_RedisModuleCmd(...) { -// return module.Mutate(..., NodeFunc, TailFunc); -// } -// -// See, for instance, ChainTableAdd_RedisCommand in ray_redis_module.cc. -class RedisChainModule { - public: - // A function that runs on every node in the chain. Type: - // (context, argv, argc, (can be nullptr) mutated_key_str) -> int - // - // (Advanced) The optional fourth arg can be used in the following way: - // - // RedisModuleString* redis_key_str = nullptr; - // node_func(ctx, argv, argc, &redis_key_str); - // // "redis_key_str" now points to the RedisModuleString whose contents - // // is mutated by "node_func". - // - // If the fourth arg is passed, NodeFunc *must* fill in the key being mutated. - // It is okay for this NodeFunc to call "RM_FreeString(mutated_key_str)" after - // assigning the fourth arg, since that call presumably only decrements a ref - // count. - using NodeFunc = std::function; - - // A function that (1) runs only after all NodeFunc's have run, and (2) runs - // once on the tail. A typical usage is to publish a write. - using TailFunc = std::function; - - // TODO(zongheng): document the RM_Reply semantics. - - // Runs "node_func" on every node in the chain; after the tail node has run it - // too, finalizes the mutation by running "tail_func". - // - // If node_func() returns non-zero, it is treated as an error and the entire - // update will terminate early, without running subsequent node_func() and the - // final tail_func(). - // - // TODO(zongheng): currently only supports 1-node chain. - int ChainReplicate(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, - NodeFunc node_func, TailFunc tail_func); -}; diff --git a/src/ray/gcs/redis_module/ray_redis_module.cc b/src/ray/gcs/redis_module/ray_redis_module.cc index 755732cf6..c62ce59c6 100644 --- a/src/ray/gcs/redis_module/ray_redis_module.cc +++ b/src/ray/gcs/redis_module/ray_redis_module.cc @@ -30,19 +30,6 @@ using ray::rpc::GcsEntry; using ray::rpc::TablePrefix; using ray::rpc::TablePubsub; -#if RAY_USE_NEW_GCS -// Under this flag, ray-project/credis will be loaded. Specifically, via -// "path/redis-server --loadmodule --loadmodule " (dlopen() under the hood) will a definition of "module" -// be supplied. -// -// All commands in this file that depend on "module" must be wrapped by "#if -// RAY_USE_NEW_GCS", until we switch to this launch configuration as the -// default. -#include "ray/gcs/redis_module/chain_module.h" -extern RedisChainModule module; -#endif - #define REPLY_AND_RETURN_IF_FALSE(CONDITION, MESSAGE) \ if (!(CONDITION)) { \ RedisModule_ReplyWithError(ctx, (MESSAGE)); \ @@ -328,13 +315,6 @@ int TableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg return TableAdd_DoPublish(ctx, argv, argc); } -#if RAY_USE_NEW_GCS -int ChainTableAdd_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - return module.ChainReplicate(ctx, argv, argc, /*node_func=*/TableAdd_DoWrite, - /*tail_func=*/TableAdd_DoPublish); -} -#endif - int TableAppend_DoWrite(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, RedisModuleString **mutated_key_str) { if (argc < 5 || argc > 6) { @@ -435,15 +415,6 @@ int TableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return TableAppend_DoPublish(ctx, argv, argc); } -#if RAY_USE_NEW_GCS -int ChainTableAppend_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, - int argc) { - return module.ChainReplicate(ctx, argv, argc, - /*node_func=*/TableAppend_DoWrite, - /*tail_func=*/TableAppend_DoPublish); -} -#endif - int Set_DoPublish(RedisModuleCtx *ctx, RedisModuleString **argv, bool is_add) { RedisModuleString *pubsub_channel_str = argv[2]; RedisModuleString *id = argv[3]; @@ -983,10 +954,6 @@ AUTO_MEMORY(TableRequestNotifications_RedisCommand); AUTO_MEMORY(TableDelete_RedisCommand); AUTO_MEMORY(TableCancelNotifications_RedisCommand); AUTO_MEMORY(DebugString_RedisCommand); -#if RAY_USE_NEW_GCS -AUTO_MEMORY(ChainTableAdd_RedisCommand); -AUTO_MEMORY(ChainTableAppend_RedisCommand); -#endif extern "C" { @@ -1055,19 +1022,6 @@ __declspec(dllexport) return REDISMODULE_ERR; } -#if RAY_USE_NEW_GCS - // Chain-enabled commands that depend on ray-project/credis. - if (RedisModule_CreateCommand(ctx, "ray.chain.table_add", ChainTableAdd_RedisCommand, - "write pubsub", 0, 0, 0) == REDISMODULE_ERR) { - return REDISMODULE_ERR; - } - if (RedisModule_CreateCommand(ctx, "ray.chain.table_append", - ChainTableAppend_RedisCommand, "write pubsub", 0, 0, - 0) == REDISMODULE_ERR) { - return REDISMODULE_ERR; - } -#endif - return REDISMODULE_OK; }