ray/release/ray_release/file_manager/remote_task.py
Kai Fricke fe27dbcd9a
[air/release] Improve file packing/unpacking (#23621)
We use tarfile to pack/unpack directories in several locations. Instead of using temporary files, we can just use io.BytesIO to avoid unnecessary disk writes.

Note that this functionality is present in 3 different modules - in Ray (AIR), in the release test package, and in a specific release test. The implementations should live in the three modules independently, so we don't add a common utility for this (e.g. the ray_release package should be independent of the Ray package).
2022-04-01 07:38:14 -07:00

82 lines
2 KiB
Python

import io
import tarfile
from typing import Optional
from ray_release.file_manager.file_manager import FileManager
def _pack(source_dir: str) -> bytes:
stream = io.BytesIO()
with tarfile.open(fileobj=stream, mode="w:gz", format=tarfile.PAX_FORMAT) as tar:
tar.add(source_dir, arcname="")
return stream.getvalue()
def _unpack(stream: bytes, target_dir: str):
with tarfile.open(fileobj=io.BytesIO(stream)) as tar:
tar.extractall(target_dir)
def send_dir_to_node(
node_ip: str,
local_dir: str,
remote_dir: str,
):
import ray
try:
packed = _pack(local_dir)
ray.get(
ray.remote(resources={f"node:{node_ip}": 0.01})(_unpack).remote(
packed, remote_dir
)
)
except Exception as e:
print(
f"Warning: Could not send remote directory contents. Message: " f"{str(e)}"
)
def fetch_dir_from_node(
node_ip: str,
remote_dir: str,
local_dir: str,
):
import ray
try:
packed = ray.get(
ray.remote(resources={f"node:{node_ip}": 0.01})(_pack).remote(remote_dir)
)
_unpack(packed, local_dir)
except Exception as e:
print(f"Warning: Could not fetch remote directory contents. Message: {str(e)}")
def _get_head_ip():
import ray
return ray.util.get_node_ip_address()
def send_dir_to_head(local_dir: str, remote_dir: str):
import ray
ip = ray.get(ray.remote(_get_head_ip).remote())
return send_dir_to_node(ip, local_dir, remote_dir)
def fetch_dir_fom_head(local_dir: str, remote_dir: str):
import ray
ip = ray.get(ray.remote(_get_head_ip).remote())
return fetch_dir_from_node(ip, remote_dir, local_dir)
class RemoteTaskFileManager(FileManager):
def upload(self, source: Optional[str] = None, target: Optional[str] = None):
send_dir_to_head(source, target)
def download(self, source: str, target: str):
fetch_dir_fom_head(source, target)