From 0651d3b629e3bb153d3df04a7d7f33bd6d92ef3c Mon Sep 17 00:00:00 2001 From: Richard Liaw Date: Thu, 4 Oct 2018 17:23:17 -0700 Subject: [PATCH] [tune/core] Use Global State API for resources (#3004) --- python/ray/experimental/state.py | 18 +++++++++++++----- python/ray/tune/ray_trial_executor.py | 7 ++++--- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index d91165637..eab71993c 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -1310,9 +1310,19 @@ class GlobalState(object): return dict(resources) + def _live_client_ids(self): + """Returns a set of client IDs corresponding to clients still alive.""" + return { + client["ClientID"] + for client in self.client_table() if client["IsInsertion"] + } + def available_resources(self): """Get the current available cluster resources. + This is different from `cluster_resources` in that this will return + idle (available) resources rather than total resources. + Note that this information can grow stale as tasks start and finish. Returns: @@ -1364,6 +1374,7 @@ class GlobalState(object): if local_scheduler_id not in local_scheduler_ids: del available_resources_by_id[local_scheduler_id] else: + # TODO(rliaw): Is this a fair assumption? # Assumes the number of Redis clients does not change subscribe_clients = [ redis_client.pubsub(ignore_subscribe_messages=True) @@ -1373,7 +1384,7 @@ class GlobalState(object): subscribe_client.subscribe( ray.gcs_utils.XRAY_HEARTBEAT_CHANNEL) - client_ids = {client["ClientID"] for client in self.client_table()} + client_ids = self._live_client_ids() while set(available_resources_by_id.keys()) != client_ids: for subscribe_client in subscribe_clients: @@ -1403,10 +1414,7 @@ class GlobalState(object): available_resources_by_id[client_id] = dynamic_resources # Update clients in cluster - client_ids = { - client["ClientID"] - for client in self.client_table() - } + client_ids = self._live_client_ids() # Remove disconnected clients for client_id in available_resources_by_id.keys(): diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 86f09cda3..acbebb38b 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -213,12 +213,13 @@ class RayTrialExecutor(TrialExecutor): assert self._committed_resources.gpu >= 0 def _update_avail_resources(self): - clients = ray.global_state.client_table() if ray.worker.global_worker.use_raylet: # TODO(rliaw): Remove once raylet flag is swapped - num_cpus = sum(cl['Resources']['CPU'] for cl in clients) - num_gpus = sum(cl['Resources'].get('GPU', 0) for cl in clients) + resources = ray.global_state.cluster_resources() + num_cpus = resources["CPU"] + num_gpus = resources["GPU"] else: + clients = ray.global_state.client_table() local_schedulers = [ entry for client in clients.values() for entry in client if (entry['ClientType'] == 'local_scheduler'