Remove counter from run_function_on_all_workers. Also remove utilitie… (#1260)

* Remove counter from run_function_on_all_workers. Also remove utilities for copying directories across machines.

* Fix linting.
This commit is contained in:
Robert Nishihara 2017-11-26 18:29:10 -08:00 committed by Philipp Moritz
parent 0b4961b161
commit 2865128df0
4 changed files with 10 additions and 158 deletions

View file

@ -2,7 +2,6 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from .utils import copy_directory
from .tfutils import TensorFlowVariables
__all__ = ["copy_directory", "TensorFlowVariables"]
__all__ = ["TensorFlowVariables"]

View file

@ -1,82 +0,0 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import io
import os
import tarfile
import sys
import ray
def tarred_directory_as_bytes(source_dir):
"""Tar a directory and return it as a byte string.
Args:
source_dir (str): The name of the directory to tar.
Returns:
A byte string representing the tarred file.
"""
# Get a BytesIO object.
string_file = io.BytesIO()
# Create an in-memory tarfile of the source directory.
with tarfile.open(mode="w:gz", fileobj=string_file) as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
string_file.seek(0)
return string_file.read()
def tarred_bytes_to_directory(tarred_bytes, target_dir):
"""Take a byte string and untar it.
Args:
tarred_bytes (str): A byte string representing the tarred file. This
should be the output of tarred_directory_as_bytes.
target_dir (str): The directory to create the untarred files in.
"""
string_file = io.BytesIO(tarred_bytes)
with tarfile.open(fileobj=string_file) as tar:
tar.extractall(path=target_dir)
def copy_directory(source_dir, target_dir=None):
"""Copy a local directory to each machine in the Ray cluster.
Note that both source_dir and target_dir must have the same basename). For
example, source_dir can be /a/b/c and target_dir can be /d/e/c. In this
case, the directory /d/e will be added to the Python path of each worker.
Note that this method is not completely safe to use. For example, workers
that do not do the copying and only set their paths (only one worker per
node does the copying) may try to execute functions that use the files in
the directory being copied before the directory being copied has finished
untarring.
Args:
source_dir (str): The directory to copy.
target_dir (str): The location to copy it to on the other machines. If
this is not provided, the source_dir will be used. If it is
provided and is different from source_dir, the source_dir also be
copied to the target_dir location on this machine.
"""
target_dir = source_dir if target_dir is None else target_dir
source_dir = os.path.abspath(source_dir)
target_dir = os.path.abspath(target_dir)
source_basename = os.path.basename(source_dir)
target_basename = os.path.basename(target_dir)
if source_basename != target_basename:
raise Exception("The source_dir and target_dir must have the same "
"base name, {} != {}".format(source_basename,
target_basename))
tarred_bytes = tarred_directory_as_bytes(source_dir)
def f(worker_info):
if worker_info["counter"] == 0:
tarred_bytes_to_directory(tarred_bytes,
os.path.dirname(target_dir))
sys.path.append(os.path.dirname(target_dir))
# Run this function on all workers to copy the directory to all nodes and
# to add the directory to the Python path of each worker.
ray.worker.global_worker.run_function_on_all_workers(f)

View file

@ -578,14 +578,9 @@ class Worker(object):
function_to_run_id = hashlib.sha1(pickled_function).digest()
key = b"FunctionsToRun:" + function_to_run_id
# First run the function on the driver. Pass in the number of
# workers on this node that have already started executing this
# remote function, and increment that value. Subtract 1 so that the
# counter starts at 0.
counter = self.redis_client.hincrby(self.node_ip_address,
key, 1) - 1
# First run the function on the driver.
# We always run the task locally.
function({"counter": counter, "worker": self})
function({"worker": self})
# Check if the function has already been put into redis.
function_exported = self.redis_client.setnx(b"Lock:" + key, 1)
if not function_exported:
@ -1575,15 +1570,11 @@ def fetch_and_execute_function_to_run(key, worker=global_worker):
"""Run on arbitrary function on the worker."""
driver_id, serialized_function = worker.redis_client.hmget(
key, ["driver_id", "function"])
# Get the number of workers on this node that have already started
# executing this remote function, and increment that value. Subtract 1 so
# the counter starts at 0.
counter = worker.redis_client.hincrby(worker.node_ip_address, key, 1) - 1
try:
# Deserialize the function.
function = pickle.loads(serialized_function)
# Run the function.
function({"counter": counter, "worker": worker})
function({"worker": worker})
except Exception:
# If an exception was thrown when the function was run, we record the
# traceback and notify the scheduler of the failure.

View file

@ -2,7 +2,6 @@ from __future__ import absolute_import, division, print_function
import os
import re
import shutil
import string
import sys
import time
@ -1062,69 +1061,17 @@ class PythonModeTest(unittest.TestCase):
ray.worker.cleanup()
class UtilsTest(unittest.TestCase):
def testCopyingDirectory(self):
# The functionality being tested here is really multi-node
# functionality, but this test just uses a single node.
ray.init(num_workers=1)
source_text = "hello world"
temp_dir1 = os.path.join(os.path.dirname(__file__), "temp_dir1")
source_dir = os.path.join(temp_dir1, "dir")
source_file = os.path.join(source_dir, "file.txt")
temp_dir2 = os.path.join(os.path.dirname(__file__), "temp_dir2")
target_dir = os.path.join(temp_dir2, "dir")
target_file = os.path.join(target_dir, "file.txt")
def remove_temporary_files():
if os.path.exists(temp_dir1):
shutil.rmtree(temp_dir1)
if os.path.exists(temp_dir2):
shutil.rmtree(temp_dir2)
# Remove the relevant files if they are left over from a previous run
# of this test.
remove_temporary_files()
# Create the source files.
os.mkdir(temp_dir1)
os.mkdir(source_dir)
with open(source_file, "w") as f:
f.write(source_text)
# Copy the source directory to the target directory.
ray.experimental.copy_directory(source_dir, target_dir)
time.sleep(0.5)
# Check that the target files exist and are the same as the source
# files.
self.assertTrue(os.path.exists(target_dir))
self.assertTrue(os.path.exists(target_file))
with open(target_file, "r") as f:
self.assertEqual(f.read(), source_text)
# Remove the relevant files to clean up.
remove_temporary_files()
ray.worker.cleanup()
class ResourcesTest(unittest.TestCase):
def testResourceConstraints(self):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=2)
# Attempt to wait for all of the workers to start up.
ray.worker.global_worker.run_function_on_all_workers(
lambda worker_info: sys.path.append(worker_info["counter"]))
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(1)
return sys.path[-1]
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(
@ -1196,15 +1143,12 @@ class ResourcesTest(unittest.TestCase):
num_workers = 20
ray.init(num_workers=num_workers, num_cpus=10, num_gpus=10)
# Attempt to wait for all of the workers to start up.
ray.worker.global_worker.run_function_on_all_workers(
lambda worker_info: sys.path.append(worker_info["counter"]))
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(1)
return sys.path[-1]
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(