mirror of
https://github.com/vale981/ray
synced 2025-03-06 10:31:39 -05:00
[GCS] Monitor.py bug fix (#8725)
* comment. * Fix bugs. * Used pubsub message instead. * Added a ray.actors test
This commit is contained in:
parent
0306e4d589
commit
7c43991100
4 changed files with 27 additions and 4 deletions
|
@ -844,7 +844,8 @@ class NodeStats(threading.Thread):
|
||||||
"type": error_data.type
|
"type": error_data.type
|
||||||
})
|
})
|
||||||
elif channel == str(actor_channel):
|
elif channel == str(actor_channel):
|
||||||
gcs_entry = ray.gcs_utils.GcsEntry.FromString(data)
|
gcs_entry = ray.gcs_utils.PubSubMessage.FromString(
|
||||||
|
data)
|
||||||
actor_data = ray.gcs_utils.ActorTableData.FromString(
|
actor_data = ray.gcs_utils.ActorTableData.FromString(
|
||||||
gcs_entry.entries[0])
|
gcs_entry.entries[0])
|
||||||
addr = (actor_data.address.ip_address,
|
addr = (actor_data.address.ip_address,
|
||||||
|
|
|
@ -121,8 +121,8 @@ class Monitor:
|
||||||
unused_channel: The message channel.
|
unused_channel: The message channel.
|
||||||
data: The message data.
|
data: The message data.
|
||||||
"""
|
"""
|
||||||
gcs_entries = ray.gcs_utils.GcsEntry.FromString(data)
|
pub_message = ray.gcs_utils.PubSubMessage.FromString(data)
|
||||||
job_data = gcs_entries.entries[0]
|
job_data = pub_message.data
|
||||||
message = ray.gcs_utils.JobTableData.FromString(job_data)
|
message = ray.gcs_utils.JobTableData.FromString(job_data)
|
||||||
job_id = message.job_id
|
job_id = message.job_id
|
||||||
if message.is_dead:
|
if message.is_dead:
|
||||||
|
|
|
@ -216,7 +216,7 @@ class GlobalState:
|
||||||
|
|
||||||
if actor_id is not None:
|
if actor_id is not None:
|
||||||
actor_id = ray.ActorID(hex_to_binary(actor_id))
|
actor_id = ray.ActorID(hex_to_binary(actor_id))
|
||||||
actor_info = self._aglobal_state_accessor.get_actor_info(actor_id)
|
actor_info = self.global_state_accessor.get_actor_info(actor_id)
|
||||||
if actor_info is None:
|
if actor_info is None:
|
||||||
return {}
|
return {}
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -106,6 +106,28 @@ def test_global_state_actor_table(ray_start_regular):
|
||||||
assert get_state() == dead_state
|
assert get_state() == dead_state
|
||||||
|
|
||||||
|
|
||||||
|
def test_global_state_actor_entry(ray_start_regular):
|
||||||
|
@ray.remote
|
||||||
|
class Actor:
|
||||||
|
def ready(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# actor table should be empty at first
|
||||||
|
assert len(ray.actors()) == 0
|
||||||
|
|
||||||
|
a = Actor.remote()
|
||||||
|
b = Actor.remote()
|
||||||
|
ray.get(a.ready.remote())
|
||||||
|
ray.get(b.ready.remote())
|
||||||
|
assert len(ray.actors()) == 2
|
||||||
|
a_actor_id = a._actor_id.hex()
|
||||||
|
b_actor_id = b._actor_id.hex()
|
||||||
|
assert ray.actors(actor_id=a_actor_id)["ActorID"] == a_actor_id
|
||||||
|
assert ray.actors(actor_id=a_actor_id)["State"] == 1
|
||||||
|
assert ray.actors(actor_id=b_actor_id)["ActorID"] == b_actor_id
|
||||||
|
assert ray.actors(actor_id=b_actor_id)["State"] == 1
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import pytest
|
import pytest
|
||||||
import sys
|
import sys
|
||||||
|
|
Loading…
Add table
Reference in a new issue