mirror of
https://github.com/vale981/ray
synced 2025-03-05 18:11:42 -05:00
[Core] Logging improvements (#10625)
* other stuff : * lint * . * . * lint * comment * lint * .
This commit is contained in:
parent
b7040f1310
commit
d9c68fca5c
5 changed files with 55 additions and 35 deletions
|
@ -276,13 +276,14 @@ class Node:
|
|||
key, params_dict[key], env_dict[key]))
|
||||
return num_cpus, num_gpus, memory, object_store_memory, result
|
||||
|
||||
env_resources = {}
|
||||
env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
|
||||
if env_string:
|
||||
env_resources = json.loads(env_string)
|
||||
logger.info(f"Autosaler overriding resources: {env_resources}.")
|
||||
|
||||
if not self._resource_spec:
|
||||
env_resources = {}
|
||||
env_string = os.getenv(
|
||||
ray_constants.RESOURCES_ENVIRONMENT_VARIABLE)
|
||||
if env_string:
|
||||
env_resources = json.loads(env_string)
|
||||
logger.info(
|
||||
f"Autosaler overriding resources: {env_resources}.")
|
||||
num_cpus, num_gpus, memory, object_store_memory, resources = \
|
||||
merge_resources(env_resources, self._ray_params.resources)
|
||||
self._resource_spec = ResourceSpec(
|
||||
|
@ -649,16 +650,17 @@ class Node:
|
|||
redis_client = self.create_redis_client()
|
||||
redis_client.hmset("webui", {"url": self._webui_url})
|
||||
|
||||
def start_plasma_store(self):
|
||||
def start_plasma_store(self, plasma_directory, object_store_memory):
|
||||
"""Start the plasma store."""
|
||||
stdout_file, stderr_file = self.get_log_file_handles(
|
||||
"plasma_store", unique=True)
|
||||
process_info = ray.services.start_plasma_store(
|
||||
self.get_resource_spec(),
|
||||
plasma_directory,
|
||||
object_store_memory,
|
||||
self._plasma_store_socket_name,
|
||||
stdout_file=stdout_file,
|
||||
stderr_file=stderr_file,
|
||||
plasma_directory=self._ray_params.plasma_directory,
|
||||
huge_pages=self._ray_params.huge_pages,
|
||||
keep_idle=bool(self._config.get("plasma_store_as_thread")),
|
||||
fate_share=self.kernel_fate_share)
|
||||
|
@ -688,7 +690,11 @@ class Node:
|
|||
process_info,
|
||||
]
|
||||
|
||||
def start_raylet(self, use_valgrind=False, use_profiler=False):
|
||||
def start_raylet(self,
|
||||
plasma_directory,
|
||||
object_store_memory,
|
||||
use_valgrind=False,
|
||||
use_profiler=False):
|
||||
"""Start the raylet.
|
||||
|
||||
Args:
|
||||
|
@ -709,12 +715,14 @@ class Node:
|
|||
self._temp_dir,
|
||||
self._session_dir,
|
||||
self.get_resource_spec(),
|
||||
self._ray_params.min_worker_port,
|
||||
self._ray_params.max_worker_port,
|
||||
self._ray_params.object_manager_port,
|
||||
self._ray_params.redis_password,
|
||||
self._ray_params.metrics_agent_port,
|
||||
self._metrics_export_port,
|
||||
plasma_directory,
|
||||
object_store_memory,
|
||||
min_worker_port=self._ray_params.min_worker_port,
|
||||
max_worker_port=self._ray_params.max_worker_port,
|
||||
object_manager_port=self._ray_params.object_manager_port,
|
||||
redis_password=self._ray_params.redis_password,
|
||||
metrics_agent_port=self._ray_params.metrics_agent_port,
|
||||
metrics_export_port=self._metrics_export_port,
|
||||
use_valgrind=use_valgrind,
|
||||
use_profiler=use_profiler,
|
||||
stdout_file=stdout_file,
|
||||
|
@ -723,7 +731,6 @@ class Node:
|
|||
include_java=self._ray_params.include_java,
|
||||
java_worker_options=self._ray_params.java_worker_options,
|
||||
load_code_from_local=self._ray_params.load_code_from_local,
|
||||
plasma_directory=self._ray_params.plasma_directory,
|
||||
huge_pages=self._ray_params.huge_pages,
|
||||
fate_share=self.kernel_fate_share,
|
||||
socket_to_use=self.socket,
|
||||
|
@ -810,8 +817,17 @@ class Node:
|
|||
logger.debug(f"Process STDOUT and STDERR is being "
|
||||
f"redirected to {self._logs_dir}.")
|
||||
|
||||
self.start_plasma_store()
|
||||
self.start_raylet()
|
||||
# Make sure we don't call `determine_plasma_store_config` multiple
|
||||
# times to avoid printing multiple warnings.
|
||||
resource_spec = self.get_resource_spec()
|
||||
plasma_directory, object_store_memory = \
|
||||
ray.services.determine_plasma_store_config(
|
||||
resource_spec.object_store_memory,
|
||||
plasma_directory=self._ray_params.plasma_directory,
|
||||
huge_pages=self._ray_params.huge_pages
|
||||
)
|
||||
self.start_plasma_store(plasma_directory, object_store_memory)
|
||||
self.start_raylet(plasma_directory, object_store_memory)
|
||||
if "RAY_USE_NEW_DASHBOARD" not in os.environ:
|
||||
self.start_reporter()
|
||||
|
||||
|
|
|
@ -1256,6 +1256,8 @@ def start_raylet(redis_address,
|
|||
temp_dir,
|
||||
session_dir,
|
||||
resource_spec,
|
||||
plasma_directory,
|
||||
object_store_memory,
|
||||
min_worker_port=None,
|
||||
max_worker_port=None,
|
||||
object_manager_port=None,
|
||||
|
@ -1270,7 +1272,6 @@ def start_raylet(redis_address,
|
|||
include_java=False,
|
||||
java_worker_options=None,
|
||||
load_code_from_local=False,
|
||||
plasma_directory=None,
|
||||
huge_pages=False,
|
||||
fate_share=None,
|
||||
socket_to_use=None,
|
||||
|
@ -1457,8 +1458,6 @@ def start_raylet(redis_address,
|
|||
subprocess.list2cmdline(agent_command)))
|
||||
if config.get("plasma_store_as_thread"):
|
||||
# command related to the plasma store
|
||||
plasma_directory, object_store_memory = determine_plasma_store_config(
|
||||
resource_spec.object_store_memory, plasma_directory, huge_pages)
|
||||
command += [
|
||||
f"--object_store_memory={object_store_memory}",
|
||||
f"--plasma_directory={plasma_directory}",
|
||||
|
@ -1653,8 +1652,8 @@ def determine_plasma_store_config(object_store_memory,
|
|||
"than the total available memory.")
|
||||
else:
|
||||
plasma_directory = os.path.abspath(plasma_directory)
|
||||
logger.warning("WARNING: object_store_memory is not verified when "
|
||||
"plasma_directory is set.")
|
||||
logger.info("object_store_memory is not verified when "
|
||||
"plasma_directory is set.")
|
||||
|
||||
if not os.path.isdir(plasma_directory):
|
||||
raise ValueError(f"The file {plasma_directory} does not "
|
||||
|
@ -1680,10 +1679,11 @@ def determine_plasma_store_config(object_store_memory,
|
|||
|
||||
|
||||
def start_plasma_store(resource_spec,
|
||||
plasma_directory,
|
||||
object_store_memory,
|
||||
plasma_store_socket_name,
|
||||
stdout_file=None,
|
||||
stderr_file=None,
|
||||
plasma_directory=None,
|
||||
keep_idle=False,
|
||||
huge_pages=False,
|
||||
fate_share=None,
|
||||
|
@ -1712,8 +1712,6 @@ def start_plasma_store(resource_spec,
|
|||
raise ValueError("Cannot use valgrind and profiler at the same time.")
|
||||
|
||||
assert resource_spec.resolved()
|
||||
plasma_directory, object_store_memory = determine_plasma_store_config(
|
||||
resource_spec.object_store_memory, plasma_directory, huge_pages)
|
||||
|
||||
command = [
|
||||
PLASMA_STORE_EXECUTABLE,
|
||||
|
|
|
@ -301,8 +301,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
|
|||
if (num_retries_left != 0) {
|
||||
auto retries_str =
|
||||
num_retries_left == -1 ? "infinite" : std::to_string(num_retries_left);
|
||||
RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId()
|
||||
<< ", attempting to resubmit.";
|
||||
RAY_LOG(INFO) << retries_str << " retries left for task " << spec.TaskId()
|
||||
<< ", attempting to resubmit.";
|
||||
retry_task_callback_(spec, /*delay=*/true);
|
||||
will_retry = true;
|
||||
} else {
|
||||
|
@ -315,8 +315,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
|
|||
(current_time_ms() - last_log_time_ms_) >
|
||||
kTaskFailureLoggingFrequencyMillis)) {
|
||||
if (num_failure_logs_++ == kTaskFailureThrottlingThreshold) {
|
||||
RAY_LOG(ERROR) << "Too many failure logs, throttling to once every "
|
||||
<< kTaskFailureLoggingFrequencyMillis << " millis.";
|
||||
RAY_LOG(WARNING) << "Too many failure logs, throttling to once every "
|
||||
<< kTaskFailureLoggingFrequencyMillis << " millis.";
|
||||
}
|
||||
last_log_time_ms_ = current_time_ms();
|
||||
if (status != nullptr) {
|
||||
|
|
|
@ -2242,11 +2242,11 @@ void NodeManager::MarkObjectsAsFailed(
|
|||
// If we failed to save the error code, log a warning and push an error message
|
||||
// to the driver.
|
||||
std::ostringstream stream;
|
||||
stream << "An plasma error (" << status.ToString() << ") occurred while saving"
|
||||
stream << "A plasma error (" << status.ToString() << ") occurred while saving"
|
||||
<< " error code to object " << object_id << ". Anyone who's getting this"
|
||||
<< " object may hang forever.";
|
||||
std::string error_message = stream.str();
|
||||
RAY_LOG(WARNING) << error_message;
|
||||
RAY_LOG(ERROR) << error_message;
|
||||
auto error_data_ptr =
|
||||
gcs::CreateErrorTableData("task", error_message, current_time_ms(), job_id);
|
||||
RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr));
|
||||
|
|
|
@ -387,9 +387,15 @@ Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_
|
|||
argv.push_back(NULL);
|
||||
Process child(argv.data(), io_service_, ec, /*decouple=*/false, env);
|
||||
if (!child.IsValid() || ec) {
|
||||
// The worker failed to start. This is a fatal error.
|
||||
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
|
||||
<< ec.message();
|
||||
// errorcode 24: Too many files. This is caused by ulimit.
|
||||
if (ec.value() == 24) {
|
||||
RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting "
|
||||
<< "`ulimit -n <num_files>` then restart Ray.";
|
||||
} else {
|
||||
// The worker failed to start. This is a fatal error.
|
||||
RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": "
|
||||
<< ec.message();
|
||||
}
|
||||
}
|
||||
return child;
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue