print task statuses in shell (#132)

This commit is contained in:
Robert Nishihara 2016-06-21 13:28:08 -07:00 committed by Philipp Moritz
parent d5fe49584a
commit 41724399ae
4 changed files with 32 additions and 4 deletions

View file

@ -104,7 +104,7 @@ def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None
ray.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
time.sleep(0.5)
def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None):
def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None, print_task_info=False):
global drivers
if num_workers_per_objstore > 0 and worker_path is None:
raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore))
@ -134,5 +134,5 @@ def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_
time.sleep(0.5)
return driver_workers
else:
ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port()))
ray.connect(scheduler_address, objstore_addresses[0], address(IP_ADDRESS, new_worker_port()), print_task_info=print_task_info)
time.sleep(0.5)

View file

@ -6,6 +6,7 @@ import typing
import funcsigs
import numpy as np
import pynumbuf
import colorama
import ray
from ray.config import LOG_DIRECTORY, LOG_TIMESTAMP
@ -18,6 +19,10 @@ class Worker(object):
self.functions = {}
self.handle = None
def set_print_task_info(self, print_task_info):
self.print_task_info = print_task_info
colorama.init()
def put_object(self, objref, value):
"""Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store."""
if pynumbuf.serializable(value):
@ -52,11 +57,28 @@ class Worker(object):
"""Tell the scheduler to schedule the execution of the function with name `func_name` with arguments `args`. Retrieve object references for the outputs of the function from the scheduler and immediately return them."""
task_capsule = serialization.serialize_task(self.handle, func_name, args)
objrefs = ray.lib.submit_task(self.handle, task_capsule)
if self.print_task_info:
print_task_info(ray.lib.task_info(self.handle))
return objrefs
# We make `global_worker` a global variable so that there is one worker per worker process.
global_worker = Worker()
# This is a helper method. It should not be called by users.
def print_task_info(task_data):
num_tasks_succeeded = task_data["num_succeeded"]
num_tasks_in_progress = len(task_data["running_tasks"])
num_tasks_failed = len(task_data["failed_tasks"])
info_strings = []
if num_tasks_succeeded > 0:
info_strings.append("{}{} task{} succeeded{}".format(colorama.Fore.BLUE, num_tasks_succeeded, "s" if num_tasks_succeeded > 1 else "", colorama.Fore.RESET))
if num_tasks_in_progress > 0:
info_strings.append("{}{} task{} in progress{}".format(colorama.Fore.GREEN, num_tasks_in_progress, "s" if num_tasks_in_progress > 1 else "", colorama.Fore.RESET))
if num_tasks_failed > 0:
info_strings.append("{}{} task{} failed{}".format(colorama.Fore.RED, num_tasks_failed, "s" if num_tasks_failed > 1 else "", colorama.Fore.RESET))
if len(info_strings) > 0:
print ", ".join(info_strings)
def scheduler_info(worker=global_worker):
return ray.lib.scheduler_info(worker.handle);
@ -74,7 +96,7 @@ def register_module(module, recursive=False, worker=global_worker):
# elif recursive and isinstance(val, ModuleType):
# register_module(val, recursive, worker)
def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker):
def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker, print_task_info=False):
if hasattr(worker, "handle"):
del worker.handle
worker.handle = ray.lib.create_worker(scheduler_addr, objstore_addr, worker_addr)
@ -82,17 +104,22 @@ def connect(scheduler_addr, objstore_addr, worker_addr, worker=global_worker):
log_basename = os.path.join(LOG_DIRECTORY, (LOG_TIMESTAMP + "-worker-{}").format(datetime.datetime.now(), worker_addr))
logging.basicConfig(level=logging.DEBUG, format=FORMAT, filename=log_basename + ".log")
ray.lib.set_log_config(log_basename + "-c++.log")
worker.set_print_task_info(print_task_info)
def disconnect(worker=global_worker):
ray.lib.disconnect(worker.handle)
def pull(objref, worker=global_worker):
ray.lib.request_object(worker.handle, objref)
if worker.print_task_info:
print_task_info(ray.lib.task_info(worker.handle))
return worker.get_object(objref)
def push(value, worker=global_worker):
objref = ray.lib.get_objref(worker.handle)
worker.put_object(objref, value)
if worker.print_task_info:
print_task_info(ray.lib.task_info(worker.handle))
return objref
def main_loop(worker=global_worker):

View file

@ -4,3 +4,4 @@ subprocess32
boto3
botocore
Pillow
colorama

View file

@ -840,7 +840,7 @@ PyObject* task_info(PyObject* self, PyObject* args) {
PyObject* dict = PyDict_New();
set_dict_item_and_transfer_ownership(dict, PyString_FromString("failed_tasks"), failed_tasks_list);
set_dict_item_and_transfer_ownership(dict, PyString_FromString("running_tasks"), running_tasks_list);
set_dict_item_and_transfer_ownership(dict, PyString_FromString("number_succeeded"), PyInt_FromLong(reply.num_succeeded()));
set_dict_item_and_transfer_ownership(dict, PyString_FromString("num_succeeded"), PyInt_FromLong(reply.num_succeeded()));
return dict;
}