[tune/core] Use Global State API for resources (#3004)

This commit is contained in:
Richard Liaw 2018-10-04 17:23:17 -07:00 committed by Robert Nishihara
parent faa31ae018
commit 0651d3b629
2 changed files with 17 additions and 8 deletions

View file

@ -1310,9 +1310,19 @@ class GlobalState(object):
return dict(resources) return dict(resources)
def _live_client_ids(self):
"""Returns a set of client IDs corresponding to clients still alive."""
return {
client["ClientID"]
for client in self.client_table() if client["IsInsertion"]
}
def available_resources(self): def available_resources(self):
"""Get the current available cluster resources. """Get the current available cluster resources.
This is different from `cluster_resources` in that this will return
idle (available) resources rather than total resources.
Note that this information can grow stale as tasks start and finish. Note that this information can grow stale as tasks start and finish.
Returns: Returns:
@ -1364,6 +1374,7 @@ class GlobalState(object):
if local_scheduler_id not in local_scheduler_ids: if local_scheduler_id not in local_scheduler_ids:
del available_resources_by_id[local_scheduler_id] del available_resources_by_id[local_scheduler_id]
else: else:
# TODO(rliaw): Is this a fair assumption?
# Assumes the number of Redis clients does not change # Assumes the number of Redis clients does not change
subscribe_clients = [ subscribe_clients = [
redis_client.pubsub(ignore_subscribe_messages=True) redis_client.pubsub(ignore_subscribe_messages=True)
@ -1373,7 +1384,7 @@ class GlobalState(object):
subscribe_client.subscribe( subscribe_client.subscribe(
ray.gcs_utils.XRAY_HEARTBEAT_CHANNEL) ray.gcs_utils.XRAY_HEARTBEAT_CHANNEL)
client_ids = {client["ClientID"] for client in self.client_table()} client_ids = self._live_client_ids()
while set(available_resources_by_id.keys()) != client_ids: while set(available_resources_by_id.keys()) != client_ids:
for subscribe_client in subscribe_clients: for subscribe_client in subscribe_clients:
@ -1403,10 +1414,7 @@ class GlobalState(object):
available_resources_by_id[client_id] = dynamic_resources available_resources_by_id[client_id] = dynamic_resources
# Update clients in cluster # Update clients in cluster
client_ids = { client_ids = self._live_client_ids()
client["ClientID"]
for client in self.client_table()
}
# Remove disconnected clients # Remove disconnected clients
for client_id in available_resources_by_id.keys(): for client_id in available_resources_by_id.keys():

View file

@ -213,12 +213,13 @@ class RayTrialExecutor(TrialExecutor):
assert self._committed_resources.gpu >= 0 assert self._committed_resources.gpu >= 0
def _update_avail_resources(self): def _update_avail_resources(self):
clients = ray.global_state.client_table()
if ray.worker.global_worker.use_raylet: if ray.worker.global_worker.use_raylet:
# TODO(rliaw): Remove once raylet flag is swapped # TODO(rliaw): Remove once raylet flag is swapped
num_cpus = sum(cl['Resources']['CPU'] for cl in clients) resources = ray.global_state.cluster_resources()
num_gpus = sum(cl['Resources'].get('GPU', 0) for cl in clients) num_cpus = resources["CPU"]
num_gpus = resources["GPU"]
else: else:
clients = ray.global_state.client_table()
local_schedulers = [ local_schedulers = [
entry for client in clients.values() for entry in client entry for client in clients.values() for entry in client
if (entry['ClientType'] == 'local_scheduler' if (entry['ClientType'] == 'local_scheduler'