[Core] Error info pubsub (Remove ray.errors API) (#9665)

This commit is contained in:
kisuke95 2020-08-04 14:04:29 +08:00 committed by GitHub
parent 8c3fc1db76
commit 28b1f7710c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 312 additions and 602 deletions

View file

@ -158,13 +158,6 @@ ray.available_resources
.. autofunction:: ray.available_resources
.. _ray-errors-ref:
ray.errors
~~~~~~~~~~
.. autofunction:: ray.errors
Experimental APIs
-----------------

View file

@ -77,7 +77,7 @@ _config = _Config()
from ray.profiling import profile # noqa: E402
from ray.state import (jobs, nodes, actors, objects, timeline,
object_transfer_timeline, cluster_resources,
available_resources, errors) # noqa: E402
available_resources) # noqa: E402
from ray.worker import (
LOCAL_MODE,
SCRIPT_MODE,
@ -122,7 +122,6 @@ __all__ = [
"object_transfer_timeline",
"cluster_resources",
"available_resources",
"errors",
"LOCAL_MODE",
"PYTHON_MODE",
"SCRIPT_MODE",

View file

@ -175,8 +175,8 @@ class NodeStats(threading.Thread):
p.subscribe(log_channel)
logger.info("NodeStats: subscribed to {}".format(log_channel))
error_channel = ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB")
p.subscribe(error_channel)
error_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN
p.psubscribe(error_channel)
logger.info("NodeStats: subscribed to {}".format(error_channel))
actor_channel = ray.gcs_utils.RAY_ACTOR_PUBSUB_PATTERN
@ -211,9 +211,10 @@ class NodeStats(threading.Thread):
pid = str(data["pid"])
self._logs[ip][pid].extend(data["lines"])
elif channel == str(error_channel):
gcs_entry = ray.gcs_utils.GcsEntry.FromString(data)
pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(
data)
error_data = ray.gcs_utils.ErrorTableData.FromString(
gcs_entry.entries[0])
pubsub_msg.data)
message = error_data.error_message
message = re.sub(r"\x1b\[\d+m", "", message)
match = re.search(r"\(pid=(\d+), ip=(.*?)\)", message)

View file

@ -60,13 +60,14 @@ RAY_ACTOR_PUBSUB_PATTERN = "ACTOR:*".encode("ascii")
# Reporter pub/sub updates
RAY_REPORTER_PUBSUB_PATTERN = "RAY_REPORTER.*".encode("ascii")
RAY_ERROR_PUBSUB_PATTERN = "ERROR_INFO:*".encode("ascii")
# These prefixes must be kept up-to-date with the TablePrefix enum in
# gcs.proto.
# TODO(rkn): We should use scoped enums, in which case we should be able to
# just access the flatbuffer generated values.
TablePrefix_RAYLET_TASK_string = "RAYLET_TASK"
TablePrefix_OBJECT_string = "OBJECT"
TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"
TablePrefix_JOB_string = "JOB"
TablePrefix_ACTOR_string = "ACTOR"

View file

@ -760,67 +760,6 @@ class GlobalState:
return dict(total_available_resources)
def _error_messages(self, job_id):
"""Get the error messages for a specific driver.
Args:
job_id: The ID of the job to get the errors for.
Returns:
A list of the error messages for this driver.
"""
assert isinstance(job_id, ray.JobID)
message = self.redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("ERROR_INFO"), "",
job_id.binary())
# If there are no errors, return early.
if message is None:
return []
gcs_entries = gcs_utils.GcsEntry.FromString(message)
error_messages = []
for entry in gcs_entries.entries:
error_data = gcs_utils.ErrorTableData.FromString(entry)
assert job_id.binary() == error_data.job_id
error_message = {
"type": error_data.type,
"message": error_data.error_message,
"timestamp": error_data.timestamp,
}
error_messages.append(error_message)
return error_messages
def error_messages(self, job_id=None):
"""Get the error messages for all drivers or a specific driver.
Args:
job_id: The specific job to get the errors for. If this is
None, then this method retrieves the errors for all jobs.
Returns:
A list of the error messages for the specified driver if one was
given, or a dictionary mapping from job ID to a list of error
messages for that driver otherwise.
"""
self._check_connected()
if job_id is not None:
assert isinstance(job_id, ray.JobID)
return self._error_messages(job_id)
error_table_keys = self.redis_client.keys(
gcs_utils.TablePrefix_ERROR_INFO_string + "*")
job_ids = [
key[len(gcs_utils.TablePrefix_ERROR_INFO_string):]
for key in error_table_keys
]
return {
binary_to_hex(job_id): self._error_messages(ray.JobID(job_id))
for job_id in job_ids
}
def actor_checkpoint_info(self, actor_id):
"""Get checkpoint info for the given actor id.
Args:
@ -1001,24 +940,3 @@ def available_resources():
resource in the cluster.
"""
return state.available_resources()
def errors(all_jobs=False):
"""Get error messages from the cluster.
Args:
all_jobs: False if we should only include error messages for this
specific job, or True if we should include error messages for all
jobs.
Returns:
Error messages pushed from the cluster. This will be a single list if
all_jobs is False, or a dictionary mapping from job ID to a list of
error messages for that job if all_jobs is True.
"""
if not all_jobs:
worker = ray.worker.global_worker
error_messages = state.error_messages(job_id=worker.current_job_id)
else:
error_messages = state.error_messages(job_id=None)
return error_messages

View file

@ -208,17 +208,6 @@ def run_string_as_driver_nonblocking(driver_script):
return proc
def flat_errors():
errors = []
for job_errors in ray.errors(all_jobs=True).values():
errors.extend(job_errors)
return errors
def relevant_errors(error_type):
return [error for error in flat_errors() if error["type"] == error_type]
def wait_for_num_actors(num_actors, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
@ -228,16 +217,6 @@ def wait_for_num_actors(num_actors, timeout=10):
raise RayTestTimeoutException("Timed out while waiting for global state.")
def wait_for_errors(error_type, num_errors, timeout=20):
start_time = time.time()
while time.time() - start_time < timeout:
if len(relevant_errors(error_type)) >= num_errors:
return
time.sleep(0.1)
raise RayTestTimeoutException("Timed out waiting for {} {} errors.".format(
num_errors, error_type))
def wait_for_condition(condition_predictor, timeout=30, retry_interval_ms=100):
"""Wait until a condition is met or time out with an exception.
@ -404,3 +383,31 @@ def get_other_nodes(cluster, exclude_head=False):
def get_non_head_nodes(cluster):
"""Get all non-head nodes."""
return list(filter(lambda x: x.head is False, cluster.list_all_nodes()))
def init_error_pubsub():
"""Initialize redis error info pub/sub"""
p = ray.worker.global_worker.redis_client.pubsub(
ignore_subscribe_messages=True)
error_pubsub_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN
p.psubscribe(error_pubsub_channel)
return p
def get_error_message(pub_sub, num, error_type=None, timeout=10):
"""Get errors through pub/sub."""
start_time = time.time()
msgs = []
while time.time() - start_time < timeout and len(msgs) < num:
msg = pub_sub.get_message()
if msg is None:
time.sleep(0.01)
continue
pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(msg["data"])
error_data = ray.gcs_utils.ErrorTableData.FromString(pubsub_msg.data)
if error_type is None or error_type == error_data.type:
msgs.append(error_data)
else:
time.sleep(0.01)
return msgs

View file

@ -9,6 +9,7 @@ import subprocess
import ray
from ray.cluster_utils import Cluster
from ray.test_utils import init_error_pubsub
@pytest.fixture
@ -209,3 +210,10 @@ def two_node_cluster():
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
@pytest.fixture()
def error_pubsub():
p = init_error_pubsub()
yield p
p.close()

View file

@ -12,13 +12,12 @@ import ray.ray_constants as ray_constants
import ray.test_utils
import ray.cluster_utils
from ray.test_utils import (
relevant_errors,
wait_for_condition,
wait_for_errors,
wait_for_pid_to_exit,
generate_internal_config_map,
get_other_nodes,
SignalActor,
get_error_message,
)
SIGKILL = signal.SIGKILL if sys.platform != "win32" else signal.SIGTERM
@ -627,10 +626,12 @@ def test_checkpointing_on_node_failure(ray_start_cluster_2_nodes,
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing_save_exception(ray_start_regular,
def test_checkpointing_save_exception(ray_start_regular, error_pubsub,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to complete."""
p = error_pubsub
@ray.remote(max_restarts=2)
class RemoteCheckpointableActor(ray_checkpointable_actor_cls):
def save_checkpoint(self, actor_id, checkpoint_context):
@ -663,14 +664,18 @@ def test_checkpointing_save_exception(ray_start_regular,
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False
# Check that the checkpoint error was pushed to the driver.
wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR
@pytest.mark.skip(reason="TODO: Actor checkpointing")
def test_checkpointing_load_exception(ray_start_regular,
def test_checkpointing_load_exception(ray_start_regular, error_pubsub,
ray_checkpointable_actor_cls):
"""Test actor can still be recovered if checkpoints fail to load."""
p = error_pubsub
@ray.remote(max_restarts=2)
class RemoteCheckpointableActor(ray_checkpointable_actor_cls):
def load_checkpoint(self, actor_id, checkpoints):
@ -704,7 +709,9 @@ def test_checkpointing_load_exception(ray_start_regular,
assert ray.get(actor.was_resumed_from_checkpoint.remote()) is False
# Check that the checkpoint error was pushed to the driver.
wait_for_errors(ray_constants.CHECKPOINT_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.CHECKPOINT_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.CHECKPOINT_PUSH_ERROR
@pytest.mark.parametrize(
@ -759,14 +766,16 @@ def test_bad_checkpointable_actor_class():
return True
def test_init_exception_in_checkpointable_actor(ray_start_regular,
ray_checkpointable_actor_cls):
def test_init_exception_in_checkpointable_actor(
ray_start_regular, error_pubsub, ray_checkpointable_actor_cls):
# This test is similar to test_failure.py::test_failed_actor_init.
# This test is used to guarantee that checkpointable actor does not
# break the same logic.
error_message1 = "actor constructor failed"
error_message2 = "actor method failed"
p = error_pubsub
@ray.remote
class CheckpointableFailedActor(ray_checkpointable_actor_cls):
def __init__(self):
@ -781,17 +790,15 @@ def test_init_exception_in_checkpointable_actor(ray_start_regular,
a = CheckpointableFailedActor.remote()
# Make sure that we get errors from a failed constructor.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message1 in errors[0]["message"]
assert error_message1 in errors[0].error_message
# Make sure that we get errors from a failed method.
a.fail_method.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 2
assert error_message1 in errors[1]["message"]
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message1 in errors[0].error_message
def test_decorated_method(ray_start_regular):

View file

@ -349,31 +349,6 @@ def test_ray_setproctitle(ray_start_2_cpus):
ray.get(unique_1.remote())
def test_duplicate_error_messages(shutdown_only):
ray.init(num_cpus=0)
driver_id = ray.WorkerID.nil()
error_data = ray.gcs_utils.construct_error_message(driver_id, "test",
"message", 0)
# Push the same message to the GCS twice (they are the same because we
# do not include a timestamp).
r = ray.worker.global_worker.redis_client
r.execute_command("RAY.TABLE_APPEND",
ray.gcs_utils.TablePrefix.Value("ERROR_INFO"),
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"),
driver_id.binary(), error_data)
# Before https://github.com/ray-project/ray/pull/3316 this would
# give an error
r.execute_command("RAY.TABLE_APPEND",
ray.gcs_utils.TablePrefix.Value("ERROR_INFO"),
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"),
driver_id.binary(), error_data)
@pytest.mark.skipif(
os.getenv("TRAVIS") is None,
reason="This test should only be run on Travis.")

View file

@ -14,15 +14,14 @@ import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import (
relevant_errors,
wait_for_condition,
wait_for_errors,
RayTestTimeoutException,
SignalActor,
init_error_pubsub,
get_error_message,
)
def test_failed_task(ray_start_regular):
def test_failed_task(ray_start_regular, error_pubsub):
@ray.remote
def throw_exception_fct1():
raise Exception("Test function 1 intentionally failed.")
@ -35,13 +34,15 @@ def test_failed_task(ray_start_regular):
def throw_exception_fct3(x):
raise Exception("Test function 3 intentionally failed.")
p = error_pubsub
throw_exception_fct1.remote()
throw_exception_fct1.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
assert len(relevant_errors(ray_constants.TASK_PUSH_ERROR)) == 2
for task in relevant_errors(ray_constants.TASK_PUSH_ERROR):
msg = task.get("message")
assert "Test function 1 intentionally failed." in msg
msgs = get_error_message(p, 2, ray_constants.TASK_PUSH_ERROR)
assert len(msgs) == 2
for msg in msgs:
assert "Test function 1 intentionally failed." in msg.error_message
x = throw_exception_fct2.remote()
try:
@ -120,7 +121,8 @@ def test_get_throws_quickly_when_found_exception(ray_start_regular):
ray.get(signal2.send.remote())
def test_fail_importing_remote_function(ray_start_2_cpus):
def test_fail_importing_remote_function(ray_start_2_cpus, error_pubsub):
p = error_pubsub
# Create the contents of a temporary Python file.
temporary_python_file = """
def temporary_helper_function():
@ -150,11 +152,11 @@ def temporary_helper_function():
# Invoke the function so that the definition is exported.
g.remote(1, y=2)
wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
assert len(errors) >= 2, errors
assert "No module named" in errors[0]["message"]
assert "No module named" in errors[1]["message"]
errors = get_error_message(
p, 2, ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
assert errors[0].type == ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR
assert "No module named" in errors[0].error_message
assert "No module named" in errors[1].error_message
# Check that if we try to call the function it throws an exception and
# does not hang.
@ -164,26 +166,28 @@ def temporary_helper_function():
ray.get(g.remote(1, y=2))
f.close()
# Clean up the junk we added to sys.path.
sys.path.pop(-1)
def test_failed_function_to_run(ray_start_2_cpus):
def test_failed_function_to_run(ray_start_2_cpus, error_pubsub):
p = error_pubsub
def f(worker):
if ray.worker.global_worker.mode == ray.WORKER_MODE:
raise Exception("Function to run failed.")
ray.worker.global_worker.run_function_on_all_workers(f)
wait_for_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR, 2)
# Check that the error message is in the task info.
errors = relevant_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR)
errors = get_error_message(p, 2, ray_constants.FUNCTION_TO_RUN_PUSH_ERROR)
assert len(errors) == 2
assert "Function to run failed." in errors[0]["message"]
assert "Function to run failed." in errors[1]["message"]
assert errors[0].type == ray_constants.FUNCTION_TO_RUN_PUSH_ERROR
assert "Function to run failed." in errors[0].error_message
assert "Function to run failed." in errors[1].error_message
def test_fail_importing_actor(ray_start_regular):
def test_fail_importing_actor(ray_start_regular, error_pubsub):
p = error_pubsub
# Create the contents of a temporary Python file.
temporary_python_file = """
def temporary_helper_function():
@ -210,21 +214,22 @@ def temporary_helper_function():
return 1
# There should be no errors yet.
assert len(ray.errors()) == 0
errors = get_error_message(p, 2)
assert len(errors) == 0
# Create an actor.
foo = Foo.remote(3, arg2=0)
# Wait for the error to arrive.
wait_for_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR)
assert "No module named" in errors[0]["message"]
errors = get_error_message(p, 2)
assert len(errors) == 2
# Wait for the error from when the __init__ tries to run.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert ("failed to be imported, and so cannot execute this method" in
errors[0]["message"])
for error in errors:
# Wait for the error to arrive.
if error.type == ray_constants.REGISTER_ACTOR_PUSH_ERROR:
assert "No module named" in error.error_message
else:
# Wait for the error from when the __init__ tries to run.
assert ("failed to be imported, and so cannot execute this method"
in error.error_message)
# Check that if we try to get the function it throws an exception and
# does not hang.
@ -232,10 +237,11 @@ def temporary_helper_function():
ray.get(foo.get_val.remote(1, arg2=2))
# Wait for the error from when the call to get_val.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert ("failed to be imported, and so cannot execute this method" in
errors[1]["message"])
errors[0].error_message)
f.close()
@ -243,7 +249,8 @@ def temporary_helper_function():
sys.path.pop(-1)
def test_failed_actor_init(ray_start_regular):
def test_failed_actor_init(ray_start_regular, error_pubsub):
p = error_pubsub
error_message1 = "actor constructor failed"
error_message2 = "actor method failed"
@ -258,20 +265,21 @@ def test_failed_actor_init(ray_start_regular):
a = FailedActor.remote()
# Make sure that we get errors from a failed constructor.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message1 in errors[0]["message"]
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message1 in errors[0].error_message
# Make sure that we get errors from a failed method.
a.fail_method.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 2
assert error_message1 in errors[1]["message"]
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message1 in errors[0].error_message
def test_failed_actor_method(ray_start_regular):
def test_failed_actor_method(ray_start_regular, error_pubsub):
p = error_pubsub
error_message2 = "actor method failed"
@ray.remote
@ -286,10 +294,10 @@ def test_failed_actor_method(ray_start_regular):
# Make sure that we get errors from a failed method.
a.fail_method.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message2 in errors[0]["message"]
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message2 in errors[0].error_message
def test_incorrect_method_calls(ray_start_regular):
@ -328,7 +336,9 @@ def test_incorrect_method_calls(ray_start_regular):
a.nonexistent_method.remote()
def test_worker_raising_exception(ray_start_regular):
def test_worker_raising_exception(ray_start_regular, error_pubsub):
p = error_pubsub
@ray.remote(max_calls=2)
def f():
# This is the only reasonable variable we can set here that makes the
@ -339,12 +349,15 @@ def test_worker_raising_exception(ray_start_regular):
# Running this task should cause the worker to raise an exception after
# the task has successfully completed.
f.remote()
wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.WORKER_CRASH_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_CRASH_PUSH_ERROR
def test_worker_dying(ray_start_regular):
def test_worker_dying(ray_start_regular, error_pubsub):
p = error_pubsub
# Define a remote function that will kill the worker that runs it.
@ray.remote(max_retries=0)
def f():
eval("exit()")
@ -352,14 +365,15 @@ def test_worker_dying(ray_start_regular):
with pytest.raises(ray.exceptions.RayWorkerError):
ray.get(f.remote())
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.WORKER_DIED_PUSH_ERROR)
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
assert len(errors) == 1
assert "died or was killed while executing" in errors[0]["message"]
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
assert "died or was killed while executing" in errors[0].error_message
def test_actor_worker_dying(ray_start_regular):
def test_actor_worker_dying(ray_start_regular, error_pubsub):
p = error_pubsub
@ray.remote
class Actor:
def kill(self):
@ -375,10 +389,14 @@ def test_actor_worker_dying(ray_start_regular):
ray.get(obj)
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(consume.remote(obj))
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
def test_actor_worker_dying_future_tasks(ray_start_regular):
def test_actor_worker_dying_future_tasks(ray_start_regular, error_pubsub):
p = error_pubsub
@ray.remote(max_restarts=0)
class Actor:
def getpid(self):
@ -397,7 +415,9 @@ def test_actor_worker_dying_future_tasks(ray_start_regular):
with pytest.raises(Exception):
ray.get(obj)
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
@ -415,7 +435,10 @@ def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
ray.get(task2)
def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
def test_actor_scope_or_intentionally_killed_message(ray_start_regular,
error_pubsub):
p = error_pubsub
@ray.remote
class Actor:
pass
@ -424,15 +447,16 @@ def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
a = Actor.remote()
a.__ray_terminate__.remote()
time.sleep(1)
assert len(
ray.errors()) == 0, ("Should not have propogated an error - {}".format(
ray.errors()))
errors = get_error_message(p, 1)
assert len(errors) == 0, "Should not have propogated an error - {}".format(
errors)
@pytest.mark.skip("This test does not work yet.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**6], indirect=True)
def test_put_error1(ray_start_object_store_memory):
def test_put_error1(ray_start_object_store_memory, error_pubsub):
p = error_pubsub
num_objects = 3
object_size = 4 * 10**5
@ -470,7 +494,10 @@ def test_put_error1(ray_start_object_store_memory):
put_arg_task.remote()
# Make sure we receive the correct error message.
wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
errors = get_error_message(p, 1,
ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
@pytest.mark.skip("This test does not work yet.")
@ -514,22 +541,28 @@ def test_put_error2(ray_start_object_store_memory):
put_task.remote()
# Make sure we receive the correct error message.
wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
# get_error_message(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
def test_version_mismatch(shutdown_only):
@pytest.mark.skip("Publish happeds before we subscribe it")
def test_version_mismatch(error_pubsub, shutdown_only):
ray_version = ray.__version__
ray.__version__ = "fake ray version"
ray.init(num_cpus=1)
p = error_pubsub
wait_for_errors(ray_constants.VERSION_MISMATCH_PUSH_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.VERSION_MISMATCH_PUSH_ERROR)
assert False, errors
assert len(errors) == 1
assert errors[0].type == ray_constants.VERSION_MISMATCH_PUSH_ERROR
# Reset the version.
ray.__version__ = ray_version
def test_export_large_objects(ray_start_regular):
def test_export_large_objects(ray_start_regular, error_pubsub):
p = error_pubsub
import ray.ray_constants as ray_constants
large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE)
@ -542,7 +575,10 @@ def test_export_large_objects(ray_start_regular):
f.remote()
# Make sure that a warning is generated.
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1)
errors = get_error_message(p, 1,
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR
@ray.remote
class Foo:
@ -552,11 +588,15 @@ def test_export_large_objects(ray_start_regular):
Foo.remote()
# Make sure that a warning is generated.
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)
errors = get_error_message(p, 1,
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR
@pytest.mark.skip(reason="TODO detect resource deadlock")
def test_warning_for_resource_deadlock(shutdown_only):
def test_warning_for_resource_deadlock(error_pubsub, shutdown_only):
p = error_pubsub
# Check that we get warning messages for infeasible tasks.
ray.init(num_cpus=1)
@ -574,10 +614,13 @@ def test_warning_for_resource_deadlock(shutdown_only):
# Run in a task to check we handle the blocked task case correctly
f.remote()
wait_for_errors(ray_constants.RESOURCE_DEADLOCK_ERROR, 1, timeout=30)
errors = get_error_message(p, 1, ray_constants.RESOURCE_DEADLOCK_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.RESOURCE_DEADLOCK_ERROR
def test_warning_for_infeasible_tasks(ray_start_regular):
def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub):
p = error_pubsub
# Check that we get warning messages for infeasible tasks.
@ray.remote(num_gpus=1)
@ -590,11 +633,15 @@ def test_warning_for_infeasible_tasks(ray_start_regular):
# This task is infeasible.
f.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
# This actor placement task is infeasible.
Foo.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 2)
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
def test_warning_for_infeasible_zero_cpu_actor(shutdown_only):
@ -603,6 +650,7 @@ def test_warning_for_infeasible_zero_cpu_actor(shutdown_only):
# requires no CPUs).
ray.init(num_cpus=0)
p = init_error_pubsub()
@ray.remote
class Foo:
@ -610,7 +658,10 @@ def test_warning_for_infeasible_zero_cpu_actor(shutdown_only):
# The actor creation should be infeasible.
Foo.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
p.close()
def test_warning_for_too_many_actors(shutdown_only):
@ -619,15 +670,23 @@ def test_warning_for_too_many_actors(shutdown_only):
num_cpus = 2
ray.init(num_cpus=num_cpus)
p = init_error_pubsub()
@ray.remote
class Foo:
def __init__(self):
time.sleep(1000)
[Foo.remote() for _ in range(num_cpus * 3)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
[Foo.remote() for _ in range(num_cpus)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 2)
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
p.close()
def test_warning_for_too_many_nested_tasks(shutdown_only):
@ -635,6 +694,7 @@ def test_warning_for_too_many_nested_tasks(shutdown_only):
# started that we will receive a warning.
num_cpus = 2
ray.init(num_cpus=num_cpus)
p = init_error_pubsub()
@ray.remote
def f():
@ -654,7 +714,10 @@ def test_warning_for_too_many_nested_tasks(shutdown_only):
ray.get(h.remote())
[g.remote() for _ in range(num_cpus * 4)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
p.close()
def test_warning_for_many_duplicate_remote_functions_and_actors(shutdown_only):
@ -786,9 +849,10 @@ def test_redis_module_failure(ray_start_regular):
# Note that this test will take at least 10 seconds because it must wait for
# the monitor to detect enough missed heartbeats.
def test_warning_for_dead_node(ray_start_cluster_2_nodes):
def test_warning_for_dead_node(ray_start_cluster_2_nodes, error_pubsub):
cluster = ray_start_cluster_2_nodes
cluster.wait_for_nodes()
p = error_pubsub
node_ids = {item["NodeID"] for item in ray.nodes()}
@ -801,14 +865,11 @@ def test_warning_for_dead_node(ray_start_cluster_2_nodes):
cluster.list_all_nodes()[0].kill_raylet()
# Check that we get warning messages for both raylets.
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40)
errors = get_error_message(p, 2, ray_constants.REMOVED_NODE_ERROR, 40)
# Extract the client IDs from the error messages. This will need to be
# changed if the error message changes.
warning_node_ids = {
item["message"].split(" ")[5]
for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR)
}
warning_node_ids = {error.error_message.split(" ")[5] for error in errors}
assert node_ids == warning_node_ids
@ -837,24 +898,28 @@ def test_connect_with_disconnected_node(shutdown_only):
cluster = Cluster()
cluster.add_node(num_cpus=0, _internal_config=config)
ray.init(address=cluster.address)
info = relevant_errors(ray_constants.REMOVED_NODE_ERROR)
assert len(info) == 0
p = init_error_pubsub()
errors = get_error_message(p, 1, timeout=5)
assert len(errors) == 0
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 1)
errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR)
assert len(errors) == 1
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
dead_node = cluster.add_node(num_cpus=0)
cluster.remove_node(dead_node, allow_graceful=False)
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2)
errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR)
assert len(errors) == 1
# This node is killed by SIGTERM, ray_monitor will not mark it again.
removing_node = cluster.add_node(num_cpus=0)
cluster.remove_node(removing_node, allow_graceful=True)
with pytest.raises(RayTestTimeoutException):
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 3, timeout=2)
errors = get_error_message(p, 1, timeout=2)
assert len(errors) == 0
# There is no connection error to a dead node.
info = relevant_errors(ray_constants.RAYLET_CONNECTION_ERROR)
assert len(info) == 0
errors = get_error_message(p, 1, timeout=2)
assert len(errors) == 0
p.close()
@pytest.mark.parametrize(

View file

@ -8,16 +8,19 @@ import ray
from ray.test_utils import (
RayTestTimeoutException, check_call_ray, run_string_as_driver,
run_string_as_driver_nonblocking, wait_for_children_of_pid,
wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore)
wait_for_children_of_pid_to_exit, kill_process_by_name, Semaphore,
init_error_pubsub, get_error_message)
def test_error_isolation(call_ray_start):
address = call_ray_start
# Connect a driver to the Ray cluster.
ray.init(address=address)
p = init_error_pubsub()
# There shouldn't be any errors yet.
assert len(ray.errors()) == 0
errors = get_error_message(p, 1, 2)
assert len(errors) == 0
error_string1 = "error_string1"
error_string2 = "error_string2"
@ -31,13 +34,11 @@ def test_error_isolation(call_ray_start):
ray.get(f.remote())
# Wait for the error to appear in Redis.
while len(ray.errors()) != 1:
time.sleep(0.1)
print("Waiting for error to appear.")
errors = get_error_message(p, 1)
# Make sure we got the error.
assert len(ray.errors()) == 1
assert error_string1 in ray.errors()[0]["message"]
assert len(errors) == 1
assert error_string1 in errors[0].error_message
# Start another driver and make sure that it does not receive this
# error. Make the other driver throw an error, and make sure it
@ -45,11 +46,13 @@ def test_error_isolation(call_ray_start):
driver_script = """
import ray
import time
from ray.test_utils import (init_error_pubsub, get_error_message)
ray.init(address="{}")
p = init_error_pubsub()
time.sleep(1)
assert len(ray.errors()) == 0
errors = get_error_message(p, 1, 2)
assert len(errors) == 0
@ray.remote
def f():
@ -60,12 +63,10 @@ try:
except Exception as e:
pass
while len(ray.errors()) != 1:
print(len(ray.errors()))
time.sleep(0.1)
assert len(ray.errors()) == 1
errors = get_error_message(p, 1)
assert len(errors) == 1
assert "{}" in ray.errors()[0]["message"]
assert "{}" in errors[0].error_message
print("success")
""".format(address, error_string2, error_string2)
@ -76,8 +77,9 @@ print("success")
# Make sure that the other error message doesn't show up for this
# driver.
assert len(ray.errors()) == 1
assert error_string1 in ray.errors()[0]["message"]
errors = get_error_message(p, 1)
assert len(errors) == 1
p.close()
def test_remote_function_isolation(call_ray_start):

View file

@ -7,8 +7,8 @@ import time
import ray
from ray.cluster_utils import Cluster
from ray.test_utils import flat_errors
import ray.ray_constants as ray_constants
from ray.test_utils import get_error_message
@pytest.fixture(params=[1, 4])
@ -203,12 +203,12 @@ def test_multiple_recursive(ray_start_reconstruction):
assert cluster.remaining_processes_alive()
def wait_for_errors(error_check):
def wait_for_errors(p, error_check):
# Wait for errors from all the nondeterministic tasks.
errors = []
time_left = 100
while time_left > 0:
errors = flat_errors()
errors.extend(get_error_message(p, 1))
if error_check(errors):
break
time_left -= 1
@ -223,7 +223,8 @@ def wait_for_errors(error_check):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_nondeterministic_task(ray_start_reconstruction):
def test_nondeterministic_task(ray_start_reconstruction, error_pubsub):
p = error_pubsub
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
@ -280,9 +281,9 @@ def test_nondeterministic_task(ray_start_reconstruction):
min_errors = 1
return len(errors) >= min_errors
errors = wait_for_errors(error_check)
errors = wait_for_errors(p, error_check)
# Make sure all the errors have the correct type.
assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
assert all(error.type == ray_constants.HASH_MISMATCH_PUSH_ERROR
for error in errors)
assert cluster.remaining_processes_alive()
@ -293,7 +294,8 @@ def test_nondeterministic_task(ray_start_reconstruction):
reason="Failing with new GCS API on Linux.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [10**9], indirect=True)
def test_driver_put_errors(ray_start_object_store_memory):
def test_driver_put_errors(ray_start_object_store_memory, error_pubsub):
p = error_pubsub
plasma_store_memory = ray_start_object_store_memory
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
@ -333,10 +335,11 @@ def test_driver_put_errors(ray_start_object_store_memory):
def error_check(errors):
return len(errors) > 1
errors = wait_for_errors(error_check)
assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
or "ray.exceptions.UnreconstructableError" in error["message"]
for error in errors)
errors = wait_for_errors(p, error_check)
assert all(
error.type == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
or "ray.exceptions.UnreconstructableError" in error.error_messages
for error in errors)
# NOTE(swang): This test tries to launch 1000 workers and breaks.

View file

@ -117,10 +117,11 @@ def push_error_to_driver_through_redis(redis_client,
# of through the raylet.
error_data = ray.gcs_utils.construct_error_message(job_id, error_type,
message, time.time())
redis_client.execute_command(
"RAY.TABLE_APPEND", ray.gcs_utils.TablePrefix.Value("ERROR_INFO"),
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"), job_id.binary(),
error_data)
pubsub_msg = ray.gcs_utils.PubSubMessage()
pubsub_msg.id = job_id.hex()
pubsub_msg.data = error_data
redis_client.publish("ERROR_INFO:" + job_id.hex(),
pubsub_msg.SerializeAsString())
def is_cython(obj):

View file

@ -1095,16 +1095,11 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
# Really we should just subscribe to the errors for this specific job.
# However, currently all errors seem to be published on the same channel.
error_pubsub_channel = str(
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB")).encode("ascii")
worker.error_message_pubsub_client.subscribe(error_pubsub_channel)
# worker.error_message_pubsub_client.psubscribe("*")
error_pubsub_channel = ray.gcs_utils.RAY_ERROR_PUBSUB_PATTERN
worker.error_message_pubsub_client.psubscribe(error_pubsub_channel)
try:
# Get the errors that occurred before the call to subscribe.
error_messages = ray.errors()
for error_message in error_messages:
logger.error(error_message)
while True:
# Exit if we received a signal that we should stop.
@ -1115,10 +1110,9 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
if msg is None:
threads_stopped.wait(timeout=0.01)
continue
gcs_entry = ray.gcs_utils.GcsEntry.FromString(msg["data"])
assert len(gcs_entry.entries) == 1
pubsub_msg = ray.gcs_utils.PubSubMessage.FromString(msg["data"])
error_data = ray.gcs_utils.ErrorTableData.FromString(
gcs_entry.entries[0])
pubsub_msg.data)
job_id = error_data.job_id
if job_id not in [
worker.current_job_id.binary(),

View file

@ -1328,21 +1328,12 @@ ServiceBasedErrorInfoAccessor::ServiceBasedErrorInfoAccessor(
Status ServiceBasedErrorInfoAccessor::AsyncReportJobError(
const std::shared_ptr<rpc::ErrorTableData> &data_ptr,
const StatusCallback &callback) {
JobID job_id = JobID::FromBinary(data_ptr->job_id());
std::string type = data_ptr->type();
RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type;
rpc::ReportJobErrorRequest request;
request.mutable_error_data()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportJobError(
request, [job_id, type, callback](const Status &status,
const rpc::ReportJobErrorReply &reply) {
if (callback) {
callback(status);
}
RAY_LOG(DEBUG) << "Finished reporting job error, status = " << status
<< ", job id = " << job_id << ", type = " << type;
});
return Status::OK();
auto job_id = JobID::FromBinary(data_ptr->job_id());
RAY_LOG(DEBUG) << "Publishing job error, job id = " << job_id;
Status status = client_impl_->GetGcsPubSub().Publish(
ERROR_INFO_CHANNEL, job_id.Hex(), data_ptr->SerializeAsString(), callback);
RAY_LOG(DEBUG) << "Finished publishing job error, job id = " << job_id;
return status;
}
ServiceBasedWorkerInfoAccessor::ServiceBasedWorkerInfoAccessor(

View file

@ -1,45 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "ray/gcs/gcs_server/error_info_handler_impl.h"
namespace ray {
namespace rpc {
void DefaultErrorInfoHandler::HandleReportJobError(
const ReportJobErrorRequest &request, ReportJobErrorReply *reply,
SendReplyCallback send_reply_callback) {
JobID job_id = JobID::FromBinary(request.error_data().job_id());
std::string type = request.error_data().type();
RAY_LOG(DEBUG) << "Reporting job error, job id = " << job_id << ", type = " << type;
auto error_table_data = std::make_shared<ErrorTableData>();
error_table_data->CopyFrom(request.error_data());
auto on_done = [job_id, type, reply, send_reply_callback](Status status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to report job error, job id = " << job_id
<< ", type = " << type;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Errors().AsyncReportJobError(error_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished reporting job error, job id = " << job_id
<< ", type = " << type;
}
} // namespace rpc
} // namespace ray

View file

@ -1,38 +0,0 @@
// Copyright 2017 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
namespace ray {
namespace rpc {
/// This implementation class of `ErrorInfoHandler`.
class DefaultErrorInfoHandler : public rpc::ErrorInfoHandler {
public:
explicit DefaultErrorInfoHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
void HandleReportJobError(const ReportJobErrorRequest &request,
ReportJobErrorReply *reply,
SendReplyCallback send_reply_callback) override;
private:
gcs::RedisGcsClient &gcs_client_;
};
} // namespace rpc
} // namespace ray

View file

@ -140,11 +140,9 @@ void GcsNodeManager::NodeFailureDetector::ScheduleTick() {
//////////////////////////////////////////////////////////////////////////////////////////
GcsNodeManager::GcsNodeManager(boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage)
: error_info_accessor_(error_info_accessor),
main_io_service_(main_io_service),
: main_io_service_(main_io_service),
node_failure_detector_(new NodeFailureDetector(
node_failure_detector_io_service, gcs_table_storage, gcs_pub_sub,
[this](const ClientID &node_id) {
@ -406,7 +404,8 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
<< " has missed too many heartbeats from it.";
auto error_data_ptr =
gcs::CreateErrorTableData(type, error_message.str(), current_time_ms());
RAY_CHECK_OK(error_info_accessor_.AsyncReportJobError(error_data_ptr, nullptr));
RAY_CHECK_OK(gcs_pub_sub_->Publish(ERROR_INFO_CHANNEL, node_id.Hex(),
error_data_ptr->SerializeAsString(), nullptr));
}
// Notify all listeners.

View file

@ -36,13 +36,11 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
///
/// \param main_io_service The main event loop.
/// \param node_failure_detector_io_service The event loop of node failure detector.
/// \param error_info_accessor The error info accessor, which is used to report error.
/// \param gcs_pub_sub GCS message publisher.
/// \param gcs_table_storage GCS table external storage accessor.
/// when detecting the death of nodes.
explicit GcsNodeManager(boost::asio::io_service &main_io_service,
boost::asio::io_service &node_failure_detector_io_service,
gcs::ErrorInfoAccessor &error_info_accessor,
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub,
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage);
@ -219,8 +217,6 @@ class GcsNodeManager : public rpc::NodeInfoHandler {
};
private:
/// Error info accessor.
gcs::ErrorInfoAccessor &error_info_accessor_;
/// The main event loop for node failure detector.
boost::asio::io_service &main_io_service_;
/// Detector to detect the failure of node.

View file

@ -16,7 +16,6 @@
#include "ray/common/network_util.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/gcs_server/error_info_handler_impl.h"
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
#include "ray/gcs/gcs_server/gcs_job_manager.h"
#include "ray/gcs/gcs_server/gcs_node_manager.h"
@ -95,11 +94,6 @@ void GcsServer::Start() {
stats_service_.reset(new rpc::StatsGrpcService(main_service_, *stats_handler_));
rpc_server_.RegisterService(*stats_service_);
error_info_handler_ = InitErrorInfoHandler();
error_info_service_.reset(
new rpc::ErrorInfoGrpcService(main_service_, *error_info_handler_));
rpc_server_.RegisterService(*error_info_service_);
gcs_worker_manager_ = InitGcsWorkerManager();
worker_info_service_.reset(
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
@ -166,8 +160,7 @@ void GcsServer::InitGcsNodeManager() {
node_manager_io_service_.run();
}));
gcs_node_manager_ = std::make_shared<GcsNodeManager>(
main_service_, node_manager_io_service_, redis_gcs_client_->Errors(), gcs_pub_sub_,
gcs_table_storage_);
main_service_, node_manager_io_service_, gcs_pub_sub_, gcs_table_storage_);
}
void GcsServer::InitGcsActorManager() {
@ -279,11 +272,6 @@ std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() {
new rpc::DefaultStatsHandler(gcs_table_storage_));
}
std::unique_ptr<rpc::ErrorInfoHandler> GcsServer::InitErrorInfoHandler() {
return std::unique_ptr<rpc::DefaultErrorInfoHandler>(
new rpc::DefaultErrorInfoHandler(*redis_gcs_client_));
}
std::unique_ptr<GcsWorkerManager> GcsServer::InitGcsWorkerManager() {
return std::unique_ptr<GcsWorkerManager>(
new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_));

View file

@ -98,9 +98,6 @@ class GcsServer {
/// The stats handler
virtual std::unique_ptr<rpc::StatsHandler> InitStatsHandler();
/// The error info handler
virtual std::unique_ptr<rpc::ErrorInfoHandler> InitErrorInfoHandler();
/// The worker manager
virtual std::unique_ptr<GcsWorkerManager> InitGcsWorkerManager();
@ -148,9 +145,6 @@ class GcsServer {
/// Stats handler and service
std::unique_ptr<rpc::StatsHandler> stats_handler_;
std::unique_ptr<rpc::StatsGrpcService> stats_service_;
/// Error info handler and service
std::unique_ptr<rpc::ErrorInfoHandler> error_info_handler_;
std::unique_ptr<rpc::ErrorInfoGrpcService> error_info_service_;
/// The gcs worker manager
std::unique_ptr<GcsWorkerManager> gcs_worker_manager_;
/// Worker info service

View file

@ -269,14 +269,6 @@ class GcsHeartbeatBatchTable : public GcsTable<ClientID, HeartbeatBatchTableData
}
};
class GcsErrorInfoTable : public GcsTable<JobID, ErrorTableData> {
public:
explicit GcsErrorInfoTable(std::shared_ptr<StoreClient> &store_client)
: GcsTable(store_client) {
table_name_ = TablePrefix_Name(TablePrefix::ERROR_INFO);
}
};
class GcsProfileTable : public GcsTable<UniqueID, ProfileTableData> {
public:
explicit GcsProfileTable(std::shared_ptr<StoreClient> &store_client)
@ -377,11 +369,6 @@ class GcsTableStorage {
return *heartbeat_batch_table_;
}
GcsErrorInfoTable &ErrorInfoTable() {
RAY_CHECK(error_info_table_ != nullptr);
return *error_info_table_;
}
GcsProfileTable &ProfileTable() {
RAY_CHECK(profile_table_ != nullptr);
return *profile_table_;
@ -413,7 +400,6 @@ class GcsTableStorage {
std::unique_ptr<GcsPlacementGroupScheduleTable> placement_group_schedule_table_;
std::unique_ptr<GcsHeartbeatTable> heartbeat_table_;
std::unique_ptr<GcsHeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<GcsErrorInfoTable> error_info_table_;
std::unique_ptr<GcsProfileTable> profile_table_;
std::unique_ptr<GcsWorkerTable> worker_table_;
std::unique_ptr<GcsInternalConfigTable> internal_config_table_;
@ -443,7 +429,6 @@ class RedisGcsTableStorage : public GcsTableStorage {
placement_group_schedule_table_.reset(
new GcsPlacementGroupScheduleTable(store_client_));
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
error_info_table_.reset(new GcsErrorInfoTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
internal_config_table_.reset(new GcsInternalConfigTable(store_client_));
@ -472,7 +457,6 @@ class InMemoryGcsTableStorage : public GcsTableStorage {
new GcsPlacementGroupScheduleTable(store_client_));
heartbeat_table_.reset(new GcsHeartbeatTable(store_client_));
heartbeat_batch_table_.reset(new GcsHeartbeatBatchTable(store_client_));
error_info_table_.reset(new GcsErrorInfoTable(store_client_));
profile_table_.reset(new GcsProfileTable(store_client_));
worker_table_.reset(new GcsWorkerTable(store_client_));
internal_config_table_.reset(new GcsInternalConfigTable(store_client_));

View file

@ -28,7 +28,7 @@ class GcsActorSchedulerTest : public ::testing::Test {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_);
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_actor_table_ =
std::make_shared<GcsServerMocker::MockedGcsActorTable>(store_client_);
@ -52,9 +52,6 @@ class GcsActorSchedulerTest : public ::testing::Test {
boost::asio::io_service io_service_;
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<GcsServerMocker::MockedGcsActorTable> gcs_actor_table_;
GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_;
std::shared_ptr<GcsServerMocker::MockRayletClient> raylet_client_;
std::shared_ptr<GcsServerMocker::MockWorkerClient> worker_client_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;

View file

@ -20,16 +20,21 @@
namespace ray {
class GcsNodeManagerTest : public ::testing::Test {
public:
GcsNodeManagerTest() {
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
}
protected:
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;
std::shared_ptr<GcsServerMocker::MockGcsPubSub> gcs_pub_sub_;
std::shared_ptr<gcs::RedisClient> redis_client_;
std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
};
TEST_F(GcsNodeManagerTest, TestManagement) {
boost::asio::io_service io_service;
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, io_service, error_info_accessor,
gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_);
// Test Add/Get/Remove functionality.
auto node = Mocker::GenNodeInfo();
auto node_id = ClientID::FromBinary(node->node_id());
@ -43,9 +48,8 @@ TEST_F(GcsNodeManagerTest, TestManagement) {
TEST_F(GcsNodeManagerTest, TestListener) {
boost::asio::io_service io_service;
auto error_info_accessor = GcsServerMocker::MockedErrorInfoAccessor();
gcs::GcsNodeManager node_manager(io_service, io_service, error_info_accessor,
gcs_pub_sub_, gcs_table_storage_);
gcs::GcsNodeManager node_manager(io_service, io_service, gcs_pub_sub_,
gcs_table_storage_);
// Test AddNodeAddedListener.
int node_count = 1000;
std::vector<std::shared_ptr<rpc::GcsNodeInfo>> added_nodes;

View file

@ -55,7 +55,7 @@ class GcsObjectManagerTest : public ::testing::Test {
void SetUp() override {
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_);
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_object_manager_ = std::make_shared<MockedGcsObjectManager>(
gcs_table_storage_, gcs_pub_sub_, *gcs_node_manager_);
GenTestData();
@ -83,7 +83,6 @@ class GcsObjectManagerTest : public ::testing::Test {
protected:
boost::asio::io_service io_service_;
GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_;
std::shared_ptr<gcs::GcsNodeManager> gcs_node_manager_;
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
std::shared_ptr<gcs::GcsPubSub> gcs_pub_sub_;

View file

@ -27,7 +27,7 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
gcs_table_storage_ = std::make_shared<gcs::RedisGcsTableStorage>(redis_client_);
gcs_pub_sub_ = std::make_shared<GcsServerMocker::MockGcsPubSub>(redis_client_);
gcs_node_manager_ = std::make_shared<gcs::GcsNodeManager>(
io_service_, io_service_, error_info_accessor_, gcs_pub_sub_, gcs_table_storage_);
io_service_, io_service_, gcs_pub_sub_, gcs_table_storage_);
gcs_table_storage_ = std::make_shared<gcs::InMemoryGcsTableStorage>(io_service_);
store_client_ = std::make_shared<gcs::InMemoryStoreClient>(io_service_);
gcs_placement_group_scheduler_ =
@ -39,7 +39,6 @@ class GcsPlacementGroupSchedulerTest : public ::testing::Test {
protected:
boost::asio::io_service io_service_;
GcsServerMocker::MockedErrorInfoAccessor error_info_accessor_;
std::shared_ptr<gcs::StoreClient> store_client_;
std::shared_ptr<GcsServerMocker::MockRayletResourceClient> raylet_client_;

View file

@ -375,16 +375,6 @@ class GcsServerTest : public ::testing::Test {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool ReportJobError(const rpc::ReportJobErrorRequest &request) {
std::promise<bool> promise;
client_->ReportJobError(
request, [&promise](const Status &status, const rpc::ReportJobErrorReply &reply) {
RAY_CHECK_OK(status);
promise.set_value(true);
});
return WaitReady(promise.get_future(), timeout_ms_);
}
bool ReportWorkerFailure(const rpc::ReportWorkerFailureRequest &request) {
std::promise<bool> promise;
client_->ReportWorkerFailure(
@ -755,16 +745,6 @@ TEST_F(GcsServerTest, TestStats) {
ASSERT_TRUE(AddProfileData(add_profile_data_request));
}
TEST_F(GcsServerTest, TestErrorInfo) {
// Report error
rpc::ReportJobErrorRequest report_error_request;
rpc::ErrorTableData error_table_data;
JobID job_id = JobID::FromInt(1);
error_table_data.set_job_id(job_id.Binary());
report_error_request.mutable_error_data()->CopyFrom(error_table_data);
ASSERT_TRUE(ReportJobError(report_error_request));
}
TEST_F(GcsServerTest, TestWorkerInfo) {
// Report worker failure
auto worker_failure_data = Mocker::GenWorkerTableData();

View file

@ -369,17 +369,6 @@ struct GcsServerMocker {
void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
};
class MockedErrorInfoAccessor : public gcs::ErrorInfoAccessor {
public:
Status AsyncReportJobError(const std::shared_ptr<rpc::ErrorTableData> &data_ptr,
const gcs::StatusCallback &callback) override {
if (callback) {
callback(Status::OK());
}
return Status::OK();
}
};
class MockGcsPubSub : public gcs::GcsPubSub {
public:
MockGcsPubSub(std::shared_ptr<gcs::RedisClient> redis_client)

View file

@ -34,6 +34,7 @@ namespace gcs {
#define TASK_LEASE_CHANNEL "TASK_LEASE"
#define HEARTBEAT_CHANNEL "HEARTBEAT"
#define HEARTBEAT_BATCH_CHANNEL "HEARTBEAT_BATCH"
#define ERROR_INFO_CHANNEL "ERROR_INFO"
/// \class GcsPubSub
///

View file

@ -763,20 +763,11 @@ Status RedisNodeInfoAccessor::AsyncSubscribeToResources(
return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), on_subscribe, done);
}
RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl) {}
RedisErrorInfoAccessor::RedisErrorInfoAccessor(RedisGcsClient *client_impl) {}
Status RedisErrorInfoAccessor::AsyncReportJobError(
const std::shared_ptr<ErrorTableData> &data_ptr, const StatusCallback &callback) {
ErrorTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const JobID &job_id,
const ErrorTableData &data) { callback(Status::OK()); };
}
JobID job_id = JobID::FromBinary(data_ptr->job_id());
ErrorTable &error_table = client_impl_->error_table();
return error_table.Append(job_id, job_id, data_ptr, on_done);
return Status::Invalid("Not implemented");
}
RedisStatsInfoAccessor::RedisStatsInfoAccessor(RedisGcsClient *client_impl)

View file

@ -414,9 +414,6 @@ class RedisErrorInfoAccessor : public ErrorInfoAccessor {
Status AsyncReportJobError(const std::shared_ptr<ErrorTableData> &data_ptr,
const StatusCallback &callback) override;
private:
RedisGcsClient *client_impl_{nullptr};
};
/// \class RedisStatsInfoAccessor

View file

@ -54,7 +54,6 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
// For raylet, NodeID should be initialized in raylet layer(not here).
client_table_.reset(new ClientTable({primary_context}, this));
error_table_.reset(new ErrorTable({primary_context}, this));
job_table_.reset(new JobTable({primary_context}, this));
heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context}, this));
// Tables below would be sharded.
@ -107,7 +106,6 @@ std::string RedisGcsClient::DebugString() const {
result << "\n- TaskReconstructionLog: " << task_reconstruction_log_->DebugString();
result << "\n- TaskLeaseTable: " << task_lease_table_->DebugString();
result << "\n- HeartbeatTable: " << heartbeat_table_->DebugString();
result << "\n- ErrorTable: " << error_table_->DebugString();
result << "\n- ProfileTable: " << profile_table_->DebugString();
result << "\n- ClientTable: " << client_table_->DebugString();
result << "\n- JobTable: " << job_table_->DebugString();
@ -140,8 +138,6 @@ HeartbeatBatchTable &RedisGcsClient::heartbeat_batch_table() {
return *heartbeat_batch_table_;
}
ErrorTable &RedisGcsClient::error_table() { return *error_table_; }
JobTable &RedisGcsClient::job_table() { return *job_table_; }
ProfileTable &RedisGcsClient::profile_table() { return *profile_table_; }

View file

@ -101,9 +101,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
virtual raylet::TaskTable &raylet_task_table();
TaskLeaseTable &task_lease_table();
TaskReconstructionLog &task_reconstruction_log();
/// Implements the Errors() interface.
// TODO: Some API for getting the error on the driver
ErrorTable &error_table();
/// Implements the Stats() interface.
ProfileTable &profile_table();
/// Implements the Workers() interface.
@ -124,7 +121,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
std::unique_ptr<TaskLeaseTable> task_lease_table_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
std::unique_ptr<HeartbeatBatchTable> heartbeat_batch_table_;
std::unique_ptr<ErrorTable> error_table_;
std::unique_ptr<ProfileTable> profile_table_;
std::unique_ptr<ClientTable> client_table_;
std::unique_ptr<ActorCheckpointTable> actor_checkpoint_table_;

View file

@ -532,10 +532,6 @@ Status Hash<ID, Data>::Subscribe(const JobID &job_id, const ClientID &client_id,
return Status::OK();
}
std::string ErrorTable::DebugString() const {
return Log<JobID, ErrorTableData>::DebugString();
}
std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}
@ -869,7 +865,6 @@ template class Log<TaskID, TaskReconstructionData>;
template class Table<TaskID, TaskLeaseData>;
template class Table<ClientID, HeartbeatTableData>;
template class Table<ClientID, HeartbeatBatchTableData>;
template class Log<JobID, ErrorTableData>;
template class Log<ClientID, GcsNodeInfo>;
template class Log<JobID, JobTableData>;
template class Log<UniqueID, ProfileTableData>;

View file

@ -860,21 +860,6 @@ class TaskTable : public Table<TaskID, TaskTableData> {
} // namespace raylet
class ErrorTable : public Log<JobID, ErrorTableData> {
public:
ErrorTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
RedisGcsClient *client)
: Log(contexts, client) {
pubsub_channel_ = TablePubsub::ERROR_INFO_PUBSUB;
prefix_ = TablePrefix::ERROR_INFO;
};
/// Returns debug string for class.
///
/// \return string.
std::string DebugString() const;
};
class ProfileTable : public Log<UniqueID, ProfileTableData> {
public:
ProfileTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,

View file

@ -33,20 +33,19 @@ enum TablePrefix {
TASK_RECONSTRUCTION = 8;
HEARTBEAT = 9;
HEARTBEAT_BATCH = 10;
ERROR_INFO = 11;
JOB = 12;
PROFILE = 13;
TASK_LEASE = 14;
ACTOR_CHECKPOINT = 15;
ACTOR_CHECKPOINT_ID = 16;
NODE_RESOURCE = 17;
DIRECT_ACTOR = 18;
JOB = 11;
PROFILE = 12;
TASK_LEASE = 13;
ACTOR_CHECKPOINT = 14;
ACTOR_CHECKPOINT_ID = 15;
NODE_RESOURCE = 16;
DIRECT_ACTOR = 17;
// WORKER is already used in WorkerType, so use WORKERS here.
WORKERS = 19;
INTERNAL_CONFIG = 20;
TABLE_PREFIX_MAX = 21;
PLACEMENT_GROUP_SCHEDULE = 22;
PLACEMENT_GROUP = 23;
WORKERS = 18;
INTERNAL_CONFIG = 29;
TABLE_PREFIX_MAX = 20;
PLACEMENT_GROUP_SCHEDULE = 21;
PLACEMENT_GROUP = 22;
}
// The channel that Add operations to the Table should be published on, if any.
@ -60,13 +59,12 @@ enum TablePubsub {
ACTOR_PUBSUB = 6;
HEARTBEAT_PUBSUB = 7;
HEARTBEAT_BATCH_PUBSUB = 8;
ERROR_INFO_PUBSUB = 9;
TASK_LEASE_PUBSUB = 10;
JOB_PUBSUB = 11;
NODE_RESOURCE_PUBSUB = 12;
DIRECT_ACTOR_PUBSUB = 13;
WORKER_FAILURE_PUBSUB = 14;
TABLE_PUBSUB_MAX = 15;
TASK_LEASE_PUBSUB = 9;
JOB_PUBSUB = 10;
NODE_RESOURCE_PUBSUB = 11;
DIRECT_ACTOR_PUBSUB = 12;
WORKER_FAILURE_PUBSUB = 13;
TABLE_PUBSUB_MAX = 14;
}
enum GcsChangeMode {

View file

@ -425,20 +425,6 @@ service StatsGcsService {
rpc GetAllProfileInfo(GetAllProfileInfoRequest) returns (GetAllProfileInfoReply);
}
message ReportJobErrorRequest {
ErrorTableData error_data = 1;
}
message ReportJobErrorReply {
GcsStatus status = 1;
}
// Service for error info access.
service ErrorInfoGcsService {
// Report a job error to GCS Service.
rpc ReportJobError(ReportJobErrorRequest) returns (ReportJobErrorReply);
}
message ReportWorkerFailureRequest {
WorkerTableData worker_failure = 1;
}

View file

@ -103,8 +103,6 @@ class GcsRpcClient {
new GrpcClient<TaskInfoGcsService>(address, port, client_call_manager));
stats_grpc_client_ = std::unique_ptr<GrpcClient<StatsGcsService>>(
new GrpcClient<StatsGcsService>(address, port, client_call_manager));
error_info_grpc_client_ = std::unique_ptr<GrpcClient<ErrorInfoGcsService>>(
new GrpcClient<ErrorInfoGcsService>(address, port, client_call_manager));
worker_info_grpc_client_ = std::unique_ptr<GrpcClient<WorkerInfoGcsService>>(
new GrpcClient<WorkerInfoGcsService>(address, port, client_call_manager));
placement_group_info_grpc_client_ =
@ -233,10 +231,6 @@ class GcsRpcClient {
/// Get information of all profiles from GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(StatsGcsService, GetAllProfileInfo, stats_grpc_client_, )
/// Report a job error to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(ErrorInfoGcsService, ReportJobError,
error_info_grpc_client_, )
/// Report a worker failure to GCS Service.
VOID_GCS_RPC_CLIENT_METHOD(WorkerInfoGcsService, ReportWorkerFailure,
worker_info_grpc_client_, )
@ -267,7 +261,6 @@ class GcsRpcClient {
std::unique_ptr<GrpcClient<ObjectInfoGcsService>> object_info_grpc_client_;
std::unique_ptr<GrpcClient<TaskInfoGcsService>> task_info_grpc_client_;
std::unique_ptr<GrpcClient<StatsGcsService>> stats_grpc_client_;
std::unique_ptr<GrpcClient<ErrorInfoGcsService>> error_info_grpc_client_;
std::unique_ptr<GrpcClient<WorkerInfoGcsService>> worker_info_grpc_client_;
std::unique_ptr<GrpcClient<PlacementGroupInfoGcsService>>
placement_group_info_grpc_client_;

View file

@ -38,9 +38,6 @@ namespace rpc {
#define STATS_SERVICE_RPC_HANDLER(HANDLER) RPC_SERVICE_HANDLER(StatsGcsService, HANDLER)
#define ERROR_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(ErrorInfoGcsService, HANDLER)
#define WORKER_INFO_SERVICE_RPC_HANDLER(HANDLER) \
RPC_SERVICE_HANDLER(WorkerInfoGcsService, HANDLER)
@ -402,41 +399,6 @@ class StatsGrpcService : public GrpcService {
StatsGcsServiceHandler &service_handler_;
};
class ErrorInfoGcsServiceHandler {
public:
virtual ~ErrorInfoGcsServiceHandler() = default;
virtual void HandleReportJobError(const ReportJobErrorRequest &request,
ReportJobErrorReply *reply,
SendReplyCallback send_reply_callback) = 0;
};
/// The `GrpcService` for `ErrorInfoGcsService`.
class ErrorInfoGrpcService : public GrpcService {
public:
/// Constructor.
///
/// \param[in] handler The service handler that actually handle the requests.
explicit ErrorInfoGrpcService(boost::asio::io_service &io_service,
ErrorInfoGcsServiceHandler &handler)
: GrpcService(io_service), service_handler_(handler){};
protected:
grpc::Service &GetGrpcService() override { return service_; }
void InitServerCallFactories(
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories) override {
ERROR_INFO_SERVICE_RPC_HANDLER(ReportJobError);
}
private:
/// The grpc async service object.
ErrorInfoGcsService::AsyncService service_;
/// The service handler that actually handle the requests.
ErrorInfoGcsServiceHandler &service_handler_;
};
class WorkerInfoGcsServiceHandler {
public:
virtual ~WorkerInfoGcsServiceHandler() = default;
@ -528,7 +490,6 @@ using NodeInfoHandler = NodeInfoGcsServiceHandler;
using ObjectInfoHandler = ObjectInfoGcsServiceHandler;
using TaskInfoHandler = TaskInfoGcsServiceHandler;
using StatsHandler = StatsGcsServiceHandler;
using ErrorInfoHandler = ErrorInfoGcsServiceHandler;
using WorkerInfoHandler = WorkerInfoGcsServiceHandler;
using PlacementGroupInfoHandler = PlacementGroupInfoGcsServiceHandler;