Simplify plasma store config (#8823)

* simplify plasma store config
This commit is contained in:
Siyuan (Ryans) Zhuang 2020-06-08 20:47:22 -07:00 committed by GitHub
parent db5cc5c8da
commit 2f690d1866
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 92 deletions

View file

@ -566,11 +566,11 @@ class Node:
stdout_file, stderr_file = self.new_log_files("plasma_store")
process_info = ray.services.start_plasma_store(
self.get_resource_spec(),
self._plasma_store_socket_name,
stdout_file=stdout_file,
stderr_file=stderr_file,
plasma_directory=self._ray_params.plasma_directory,
huge_pages=self._ray_params.huge_pages,
plasma_store_socket_name=self._plasma_store_socket_name,
fate_share=self.kernel_fate_share)
assert (
ray_constants.PROCESS_TYPE_PLASMA_STORE not in self.all_processes)

View file

@ -1481,6 +1481,14 @@ def determine_plasma_store_config(object_store_memory,
The plasma directory to use. If it is specified by the user, then that
value will be preserved.
"""
if not isinstance(object_store_memory, int):
object_store_memory = int(object_store_memory)
if huge_pages and not (sys.platform == "linux"
or sys.platform == "linux2"):
raise ValueError("The huge_pages argument is only supported on "
"Linux.")
system_memory = ray.utils.get_system_memory()
# Determine which directory to use. By default, use /tmp on MacOS and
@ -1522,90 +1530,39 @@ def determine_plasma_store_config(object_store_memory,
"The file {} does not exist or is not a directory.".format(
plasma_directory))
return plasma_directory
def _start_plasma_store(plasma_store_memory,
use_valgrind=False,
use_profiler=False,
stdout_file=None,
stderr_file=None,
plasma_directory=None,
huge_pages=False,
socket_name=None,
fate_share=None):
"""Start a plasma store process.
Args:
plasma_store_memory (int): The amount of memory in bytes to start the
plasma store with.
use_valgrind (bool): True if the plasma store should be started inside
of valgrind. If this is True, use_profiler must be False.
use_profiler (bool): True if the plasma store should be started inside
a profiler. If this is True, use_valgrind must be False.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
no redirection should happen, then this should be None.
plasma_directory: A directory where the Plasma memory mapped files will
be created.
huge_pages: a boolean flag indicating whether to start the
Object Store with hugetlbfs support. Requires plasma_directory.
socket_name (str): If provided, it will specify the socket
name used by the plasma store.
Return:
A tuple of the name of the plasma store socket and ProcessInfo for the
plasma store process.
"""
if use_valgrind and use_profiler:
raise ValueError("Cannot use valgrind and profiler at the same time.")
if huge_pages and not (sys.platform == "linux"
or sys.platform == "linux2"):
raise ValueError("The huge_pages argument is only supported on "
"Linux.")
if huge_pages and plasma_directory is None:
raise ValueError("If huge_pages is True, then the "
"plasma_directory argument must be provided.")
if not isinstance(plasma_store_memory, int):
plasma_store_memory = int(plasma_store_memory)
if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
raise ValueError("Attempting to cap object store memory usage at {} "
"bytes, but the minimum allowed is {} bytes.".format(
object_store_memory,
ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES))
command = [
PLASMA_STORE_EXECUTABLE,
"-s",
socket_name,
"-m",
str(plasma_store_memory),
]
if plasma_directory is not None:
command += ["-d", plasma_directory]
if huge_pages:
command += ["-h"]
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_PLASMA_STORE,
use_valgrind=use_valgrind,
use_valgrind_profiler=use_profiler,
stdout_file=stdout_file,
stderr_file=stderr_file,
fate_share=fate_share)
return process_info
# Print the object store memory using two decimal places.
logger.debug(
"Determine to start the Plasma object store with {} GB memory "
"using {}.".format(
round(object_store_memory / 10**9, 2), plasma_directory))
return plasma_directory, object_store_memory
def start_plasma_store(resource_spec,
plasma_store_socket_name,
stdout_file=None,
stderr_file=None,
plasma_directory=None,
huge_pages=False,
plasma_store_socket_name=None,
fate_share=None):
fate_share=None,
use_valgrind=False):
"""This method starts an object store process.
Args:
resource_spec (ResourceSpec): Resources for the node.
plasma_store_socket_name (str): The path/name of the plasma
store socket.
stdout_file: A file handle opened for writing to redirect stdout
to. If no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr
@ -1618,33 +1575,33 @@ def start_plasma_store(resource_spec,
Returns:
ProcessInfo for the process that was started.
"""
assert resource_spec.resolved()
object_store_memory = resource_spec.object_store_memory
plasma_directory = determine_plasma_store_config(
object_store_memory, plasma_directory, huge_pages)
if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
raise ValueError("Attempting to cap object store memory usage at {} "
"bytes, but the minimum allowed is {} bytes.".format(
object_store_memory,
ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES))
# Print the object store memory using two decimal places.
object_store_memory_str = (object_store_memory / 10**7) / 10**2
logger.debug("Starting the Plasma object store with {} GB memory "
"using {}.".format(
round(object_store_memory_str, 2), plasma_directory))
# Start the Plasma store.
process_info = _start_plasma_store(
object_store_memory,
use_profiler=RUN_PLASMA_STORE_PROFILER,
if use_valgrind and RUN_PLASMA_STORE_PROFILER:
raise ValueError("Cannot use valgrind and profiler at the same time.")
assert resource_spec.resolved()
plasma_directory, object_store_memory = determine_plasma_store_config(
resource_spec.object_store_memory, plasma_directory, huge_pages)
command = [
PLASMA_STORE_EXECUTABLE,
"-s",
plasma_store_socket_name,
"-m",
str(object_store_memory),
]
if plasma_directory is not None:
command += ["-d", plasma_directory]
if huge_pages:
command += ["-h"]
process_info = start_ray_process(
command,
ray_constants.PROCESS_TYPE_PLASMA_STORE,
use_valgrind=use_valgrind,
use_valgrind_profiler=RUN_PLASMA_STORE_PROFILER,
stdout_file=stdout_file,
stderr_file=stderr_file,
plasma_directory=plasma_directory,
huge_pages=huge_pages,
socket_name=plasma_store_socket_name,
fate_share=fate_share)
return process_info