Update task_table and object_table API. (#3161)

* Update task_table and object_table API.

* Fix
This commit is contained in:
Robert Nishihara 2018-10-31 12:52:50 -07:00 committed by Philipp Moritz
parent 9df2e6e6f4
commit 1f29a960f4
3 changed files with 54 additions and 132 deletions

View file

@ -162,22 +162,26 @@ class GlobalState(object):
message = self._execute_command(object_id, "RAY.TABLE_LOOKUP",
ray.gcs_utils.TablePrefix.OBJECT, "",
object_id.id())
result = []
gcs_entry = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)
for i in range(gcs_entry.EntriesLength()):
assert gcs_entry.EntriesLength() > 0
entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData(
gcs_entry.Entries(0), 0)
object_info = {
"DataSize": entry.ObjectSize(),
"Manager": entry.Manager(),
"IsEviction": [entry.IsEviction()],
}
for i in range(1, gcs_entry.EntriesLength()):
entry = ray.gcs_utils.ObjectTableData.GetRootAsObjectTableData(
gcs_entry.Entries(i), 0)
object_info = {
"DataSize": entry.ObjectSize(),
"Manager": entry.Manager(),
"IsEviction": entry.IsEviction(),
"NumEvictions": entry.NumEvictions()
}
result.append(object_info)
object_info["IsEviction"].append(entry.IsEviction())
return result
return object_info
def object_table(self, object_id=None):
"""Fetch and parse the object table info for one or more object IDs.
@ -224,44 +228,42 @@ class GlobalState(object):
gcs_entries = ray.gcs_utils.GcsTableEntry.GetRootAsGcsTableEntry(
message, 0)
info = []
for i in range(gcs_entries.EntriesLength()):
task_table_message = ray.gcs_utils.Task.GetRootAsTask(
gcs_entries.Entries(i), 0)
assert gcs_entries.EntriesLength() == 1
execution_spec = task_table_message.TaskExecutionSpec()
task_spec = task_table_message.TaskSpecification()
task_spec = ray.raylet.task_from_string(task_spec)
task_spec_info = {
"DriverID": binary_to_hex(task_spec.driver_id().id()),
"TaskID": binary_to_hex(task_spec.task_id().id()),
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCreationID": binary_to_hex(
task_spec.actor_creation_id().id()),
"ActorCreationDummyObjectID": binary_to_hex(
task_spec.actor_creation_dummy_object_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()
}
task_table_message = ray.gcs_utils.Task.GetRootAsTask(
gcs_entries.Entries(0), 0)
info.append({
"ExecutionSpec": {
"Dependencies": [
execution_spec.Dependencies(i)
for i in range(execution_spec.DependenciesLength())
],
"LastTimestamp": execution_spec.LastTimestamp(),
"NumForwards": execution_spec.NumForwards()
},
"TaskSpec": task_spec_info
})
execution_spec = task_table_message.TaskExecutionSpec()
task_spec = task_table_message.TaskSpecification()
task_spec = ray.raylet.task_from_string(task_spec)
task_spec_info = {
"DriverID": binary_to_hex(task_spec.driver_id().id()),
"TaskID": binary_to_hex(task_spec.task_id().id()),
"ParentTaskID": binary_to_hex(task_spec.parent_task_id().id()),
"ParentCounter": task_spec.parent_counter(),
"ActorID": binary_to_hex(task_spec.actor_id().id()),
"ActorCreationID": binary_to_hex(
task_spec.actor_creation_id().id()),
"ActorCreationDummyObjectID": binary_to_hex(
task_spec.actor_creation_dummy_object_id().id()),
"ActorCounter": task_spec.actor_counter(),
"FunctionID": binary_to_hex(task_spec.function_id().id()),
"Args": task_spec.arguments(),
"ReturnObjectIDs": task_spec.returns(),
"RequiredResources": task_spec.required_resources()
}
return info
return {
"ExecutionSpec": {
"Dependencies": [
execution_spec.Dependencies(i)
for i in range(execution_spec.DependenciesLength())
],
"LastTimestamp": execution_spec.LastTimestamp(),
"NumForwards": execution_spec.NumForwards()
},
"TaskSpec": task_spec_info
}
def task_table(self, task_id=None):
"""Fetch and parse the task table information for one or more task IDs.

View file

@ -152,10 +152,8 @@ class Monitor(object):
task_table_objects = self.state.task_table()
driver_id_hex = binary_to_hex(driver_id)
driver_task_id_bins = set()
for task_id_hex in task_table_objects:
if len(task_table_objects[task_id_hex]) == 0:
continue
task_table_object = task_table_objects[task_id_hex][0]["TaskSpec"]
for task_id_hex, task_info in task_table_objects.items():
task_table_object = task_info["TaskSpec"]
task_driver_id_hex = task_table_object["DriverID"]
if driver_id_hex != task_driver_id_hex:
# Ignore tasks that aren't from this driver.
@ -165,8 +163,7 @@ class Monitor(object):
# Get objects associated with the driver.
object_table_objects = self.state.object_table()
driver_object_id_bins = set()
for object_id, object_table_object in object_table_objects.items():
assert len(object_table_object) > 0
for object_id, _ in object_table_objects.items():
task_id_bin = ray.raylet.compute_task_id(object_id).id()
if task_id_bin in driver_task_id_bins:
driver_object_id_bins.add(object_id.id())

View file

@ -2115,8 +2115,7 @@ def test_global_state_api(shutdown_only):
task_table = ray.global_state.task_table()
assert len(task_table) == 1
assert driver_task_id == list(task_table.keys())[0]
assert len(task_table[driver_task_id]) == 1
task_spec = task_table[driver_task_id][0]["TaskSpec"]
task_spec = task_table[driver_task_id]["TaskSpec"]
assert task_spec["TaskID"] == driver_task_id
assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff"
@ -2147,7 +2146,7 @@ def test_global_state_api(shutdown_only):
task_id = list(task_id_set)[0]
function_table = ray.global_state.function_table()
task_spec = task_table[task_id][0]["TaskSpec"]
task_spec = task_table[task_id]["TaskSpec"]
assert task_spec["ActorID"] == ray_constants.ID_SIZE * "ff"
assert task_spec["Args"] == [1, "hi", x_id]
assert task_spec["DriverID"] == driver_id
@ -2178,13 +2177,9 @@ def test_global_state_api(shutdown_only):
object_table = ray.global_state.object_table()
assert len(object_table) == 2
assert len(object_table[x_id]) == 1
assert object_table[x_id][0]["IsEviction"] is False
assert object_table[x_id][0]["NumEvictions"] == 0
assert object_table[x_id]["IsEviction"][0] is False
assert len(object_table[result_id]) == 1
assert object_table[result_id][0]["IsEviction"] is False
assert object_table[result_id][0]["NumEvictions"] == 0
assert object_table[result_id]["IsEviction"][0] is False
assert object_table[x_id] == ray.global_state.object_table(x_id)
object_table_entry = ray.global_state.object_table(result_id)
@ -2251,78 +2246,6 @@ def test_workers(shutdown_only):
assert "stdout_file" in info
@pytest.mark.skip("This test does not work yet.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
def test_flush_api(shutdown_only):
ray.init(num_cpus=1)
@ray.remote
def f():
return 1
[ray.put(1) for _ in range(10)]
ray.get([f.remote() for _ in range(10)])
# Wait until all of the task and object information has been stored in
# Redis. Note that since a given key may be updated multiple times
# (e.g., multiple calls to TaskTableUpdate), this is an attempt to wait
# until all updates have happened. Note that in a real application we
# could encounter this kind of issue as well.
while True:
object_table = ray.global_state.object_table()
task_table = ray.global_state.task_table()
tables_ready = True
if len(object_table) != 20:
tables_ready = False
for object_info in object_table.values():
if len(object_info) != 5:
tables_ready = False
if (object_info["ManagerIDs"] is None
or object_info["DataSize"] == -1
or object_info["Hash"] == ""):
tables_ready = False
if len(task_table) != 10 + 1:
tables_ready = False
driver_task_id = ray.utils.binary_to_hex(
ray.worker.global_worker.current_task_id.id())
for info in task_table.values():
if info["State"] != ray.experimental.state.TASK_STATUS_DONE:
if info["TaskSpec"]["TaskID"] != driver_task_id:
tables_ready = False
if tables_ready:
break
# this test case is blocked sometimes, add this may fix the problem
time.sleep(0.1)
# Flush the tables.
ray.experimental.flush_redis_unsafe()
ray.experimental.flush_task_and_object_metadata_unsafe()
# Make sure the tables are empty.
assert len(ray.global_state.object_table()) == 0
assert len(ray.global_state.task_table()) == 0
# Run some more tasks.
ray.get([f.remote() for _ in range(10)])
while len(ray.global_state.task_table()) != 0:
time.sleep(0.1)
ray.experimental.flush_finished_tasks_unsafe()
# Make sure that we can call this method (but it won't do anything in
# this test case).
ray.experimental.flush_evicted_objects_unsafe()
@pytest.fixture
def shutdown_only_with_initialization_check():
yield None