From 1f29a960f4fd21b7b43706aa2f75f8188cad22b5 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 31 Oct 2018 12:52:50 -0700 Subject: [PATCH] Update task_table and object_table API. (#3161) * Update task_table and object_table API. * Fix --- python/ray/experimental/state.py | 92 ++++++++++++++++---------------- python/ray/monitor.py | 9 ++-- test/runtest.py | 85 ++--------------------------- 3 files changed, 54 insertions(+), 132 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index cf3d18283..177de89d2 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -162,22 +162,26 @@ class GlobalState(object): message = self._execute_command(object_id, "RAY.TABLE_LOOKUP", ray.gcs_utils.TablePrefix.OBJECT, "", object_id.id()) - result = [] gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) - for i in range(gcs_entry.EntriesLength()): + assert gcs_entry.EntriesLength() > 0 + + entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData( + gcs_entry.Entries(0), 0) + + object_info = { + "DataSize": entry.ObjectSize(), + "Manager": entry.Manager(), + "IsEviction": [entry.IsEviction()], + } + + for i in range(1, gcs_entry.EntriesLength()): entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData( gcs_entry.Entries(i), 0) - object_info = { - "DataSize": entry.ObjectSize(), - "Manager": entry.Manager(), - "IsEviction": entry.IsEviction(), - "NumEvictions": entry.NumEvictions() - } - result.append(object_info) + object_info["IsEviction"].append(entry.IsEviction()) - return result + return object_info def object_table(self, object_id=None): """Fetch and parse the object table info for one or more object IDs. @@ -224,44 +228,42 @@ class GlobalState(object): gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry( message, 0) - info = [] - for i in range(gcs_entries.EntriesLength()): - task_table_message = ray.gcs_utils.Task.GetRootAsTask( - gcs_entries.Entries(i), 0) + assert gcs_entries.EntriesLength() == 1 - execution_spec = task_table_message.TaskExecutionSpec() - task_spec = task_table_message.TaskSpecification() - task_spec = ray.raylet.task_from_string(task_spec) - task_spec_info = { - "DriverID": binary_to_hex(task_spec.driver_id().id()), - "TaskID": binary_to_hex(task_spec.task_id().id()), - "ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()), - "ParentCounter": task_spec.parent_counter(), - "ActorID": binary_to_hex(task_spec.actor_id().id()), - "ActorCreationID": binary_to_hex( - task_spec.actor_creation_id().id()), - "ActorCreationDummyObjectID": binary_to_hex( - task_spec.actor_creation_dummy_object_id().id()), - "ActorCounter": task_spec.actor_counter(), - "FunctionID": binary_to_hex(task_spec.function_id().id()), - "Args": task_spec.arguments(), - "ReturnObjectIDs": task_spec.returns(), - "RequiredResources": task_spec.required_resources() - } + task_table_message = ray.gcs_utils.Task.GetRootAsTask( + gcs_entries.Entries(0), 0) - info.append({ - "ExecutionSpec": { - "Dependencies": [ - execution_spec.Dependencies(i) - for i in range(execution_spec.DependenciesLength()) - ], - "LastTimestamp": execution_spec.LastTimestamp(), - "NumForwards": execution_spec.NumForwards() - }, - "TaskSpec": task_spec_info - }) + execution_spec = task_table_message.TaskExecutionSpec() + task_spec = task_table_message.TaskSpecification() + task_spec = ray.raylet.task_from_string(task_spec) + task_spec_info = { + "DriverID": binary_to_hex(task_spec.driver_id().id()), + "TaskID": binary_to_hex(task_spec.task_id().id()), + "ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()), + "ParentCounter": task_spec.parent_counter(), + "ActorID": binary_to_hex(task_spec.actor_id().id()), + "ActorCreationID": binary_to_hex( + task_spec.actor_creation_id().id()), + "ActorCreationDummyObjectID": binary_to_hex( + task_spec.actor_creation_dummy_object_id().id()), + "ActorCounter": task_spec.actor_counter(), + "FunctionID": binary_to_hex(task_spec.function_id().id()), + "Args": task_spec.arguments(), + "ReturnObjectIDs": task_spec.returns(), + "RequiredResources": task_spec.required_resources() + } - return info + return { + "ExecutionSpec": { + "Dependencies": [ + execution_spec.Dependencies(i) + for i in range(execution_spec.DependenciesLength()) + ], + "LastTimestamp": execution_spec.LastTimestamp(), + "NumForwards": execution_spec.NumForwards() + }, + "TaskSpec": task_spec_info + } def task_table(self, task_id=None): """Fetch and parse the task table information for one or more task IDs. diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 8094e3d5e..625641790 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -152,10 +152,8 @@ class Monitor(object): task_table_objects = self.state.task_table() driver_id_hex = binary_to_hex(driver_id) driver_task_id_bins = set() - for task_id_hex in task_table_objects: - if len(task_table_objects[task_id_hex]) == 0: - continue - task_table_object = task_table_objects[task_id_hex][0]["TaskSpec"] + for task_id_hex, task_info in task_table_objects.items(): + task_table_object = task_info["TaskSpec"] task_driver_id_hex = task_table_object["DriverID"] if driver_id_hex != task_driver_id_hex: # Ignore tasks that aren't from this driver. @@ -165,8 +163,7 @@ class Monitor(object): # Get objects associated with the driver. object_table_objects = self.state.object_table() driver_object_id_bins = set() - for object_id, object_table_object in object_table_objects.items(): - assert len(object_table_object) > 0 + for object_id, _ in object_table_objects.items(): task_id_bin = ray.raylet.compute_task_id(object_id).id() if task_id_bin in driver_task_id_bins: driver_object_id_bins.add(object_id.id()) diff --git a/test/runtest.py b/test/runtest.py index 353403626..10c4fa46d 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2115,8 +2115,7 @@ def test_global_state_api(shutdown_only): task_table = ray.global_state.task_table() assert len(task_table) == 1 assert driver_task_id == list(task_table.keys())[0] - assert len(task_table[driver_task_id]) == 1 - task_spec = task_table[driver_task_id][0]["TaskSpec"] + task_spec = task_table[driver_task_id]["TaskSpec"] assert task_spec["TaskID"] == driver_task_id assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff" @@ -2147,7 +2146,7 @@ def test_global_state_api(shutdown_only): task_id = list(task_id_set)[0] function_table = ray.global_state.function_table() - task_spec = task_table[task_id][0]["TaskSpec"] + task_spec = task_table[task_id]["TaskSpec"] assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff" assert task_spec["Args"] == [1, "hi", x_id] assert task_spec["DriverID"] == driver_id @@ -2178,13 +2177,9 @@ def test_global_state_api(shutdown_only): object_table = ray.global_state.object_table() assert len(object_table) == 2 - assert len(object_table[x_id]) == 1 - assert object_table[x_id][0]["IsEviction"] is False - assert object_table[x_id][0]["NumEvictions"] == 0 + assert object_table[x_id]["IsEviction"][0] is False - assert len(object_table[result_id]) == 1 - assert object_table[result_id][0]["IsEviction"] is False - assert object_table[result_id][0]["NumEvictions"] == 0 + assert object_table[result_id]["IsEviction"][0] is False assert object_table[x_id] == ray.global_state.object_table(x_id) object_table_entry = ray.global_state.object_table(result_id) @@ -2251,78 +2246,6 @@ def test_workers(shutdown_only): assert "stdout_file" in info -@pytest.mark.skip("This test does not work yet.") -@pytest.mark.skipif( - os.environ.get("RAY_USE_NEW_GCS") == "on", - reason="New GCS API doesn't have a Python API yet.") -def test_flush_api(shutdown_only): - ray.init(num_cpus=1) - - @ray.remote - def f(): - return 1 - - [ray.put(1) for _ in range(10)] - ray.get([f.remote() for _ in range(10)]) - - # Wait until all of the task and object information has been stored in - # Redis. Note that since a given key may be updated multiple times - # (e.g., multiple calls to TaskTableUpdate), this is an attempt to wait - # until all updates have happened. Note that in a real application we - # could encounter this kind of issue as well. - while True: - object_table = ray.global_state.object_table() - task_table = ray.global_state.task_table() - - tables_ready = True - - if len(object_table) != 20: - tables_ready = False - - for object_info in object_table.values(): - if len(object_info) != 5: - tables_ready = False - if (object_info["ManagerIDs"] is None - or object_info["DataSize"] == -1 - or object_info["Hash"] == ""): - tables_ready = False - - if len(task_table) != 10 + 1: - tables_ready = False - - driver_task_id = ray.utils.binary_to_hex( - ray.worker.global_worker.current_task_id.id()) - - for info in task_table.values(): - if info["State"] != ray.experimental.state.TASK_STATUS_DONE: - if info["TaskSpec"]["TaskID"] != driver_task_id: - tables_ready = False - - if tables_ready: - break - # this test case is blocked sometimes, add this may fix the problem - time.sleep(0.1) - - # Flush the tables. - ray.experimental.flush_redis_unsafe() - ray.experimental.flush_task_and_object_metadata_unsafe() - - # Make sure the tables are empty. - assert len(ray.global_state.object_table()) == 0 - assert len(ray.global_state.task_table()) == 0 - - # Run some more tasks. - ray.get([f.remote() for _ in range(10)]) - - while len(ray.global_state.task_table()) != 0: - time.sleep(0.1) - ray.experimental.flush_finished_tasks_unsafe() - - # Make sure that we can call this method (but it won't do anything in - # this test case). - ray.experimental.flush_evicted_objects_unsafe() - - @pytest.fixture def shutdown_only_with_initialization_check(): yield None