From d9c68fca5c2e9beb87b4eb460f44c8d6a60c2583 Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Tue, 8 Sep 2020 20:58:05 -0700 Subject: [PATCH] [Core] Logging improvements (#10625) * other stuff : * lint * . * . * lint * comment * lint * . --- python/ray/node.py | 52 +++++++++++++++++++---------- python/ray/services.py | 14 ++++---- src/ray/core_worker/task_manager.cc | 8 ++--- src/ray/raylet/node_manager.cc | 4 +-- src/ray/raylet/worker_pool.cc | 12 +++++-- 5 files changed, 55 insertions(+), 35 deletions(-) diff --git a/python/ray/node.py b/python/ray/node.py index a65cc1e87..2fd8d46b1 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -276,13 +276,14 @@ class Node: key, params_dict[key], env_dict[key])) return num_cpus, num_gpus, memory, object_store_memory, result - env_resources = {} - env_string = os.getenv(ray_constants.RESOURCES_ENVIRONMENT_VARIABLE) - if env_string: - env_resources = json.loads(env_string) - logger.info(f"Autosaler overriding resources: {env_resources}.") - if not self._resource_spec: + env_resources = {} + env_string = os.getenv( + ray_constants.RESOURCES_ENVIRONMENT_VARIABLE) + if env_string: + env_resources = json.loads(env_string) + logger.info( + f"Autosaler overriding resources: {env_resources}.") num_cpus, num_gpus, memory, object_store_memory, resources = \ merge_resources(env_resources, self._ray_params.resources) self._resource_spec = ResourceSpec( @@ -649,16 +650,17 @@ class Node: redis_client = self.create_redis_client() redis_client.hmset("webui", {"url": self._webui_url}) - def start_plasma_store(self): + def start_plasma_store(self, plasma_directory, object_store_memory): """Start the plasma store.""" stdout_file, stderr_file = self.get_log_file_handles( "plasma_store", unique=True) process_info = ray.services.start_plasma_store( self.get_resource_spec(), + plasma_directory, + object_store_memory, self._plasma_store_socket_name, stdout_file=stdout_file, stderr_file=stderr_file, - plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, keep_idle=bool(self._config.get("plasma_store_as_thread")), fate_share=self.kernel_fate_share) @@ -688,7 +690,11 @@ class Node: process_info, ] - def start_raylet(self, use_valgrind=False, use_profiler=False): + def start_raylet(self, + plasma_directory, + object_store_memory, + use_valgrind=False, + use_profiler=False): """Start the raylet. Args: @@ -709,12 +715,14 @@ class Node: self._temp_dir, self._session_dir, self.get_resource_spec(), - self._ray_params.min_worker_port, - self._ray_params.max_worker_port, - self._ray_params.object_manager_port, - self._ray_params.redis_password, - self._ray_params.metrics_agent_port, - self._metrics_export_port, + plasma_directory, + object_store_memory, + min_worker_port=self._ray_params.min_worker_port, + max_worker_port=self._ray_params.max_worker_port, + object_manager_port=self._ray_params.object_manager_port, + redis_password=self._ray_params.redis_password, + metrics_agent_port=self._ray_params.metrics_agent_port, + metrics_export_port=self._metrics_export_port, use_valgrind=use_valgrind, use_profiler=use_profiler, stdout_file=stdout_file, @@ -723,7 +731,6 @@ class Node: include_java=self._ray_params.include_java, java_worker_options=self._ray_params.java_worker_options, load_code_from_local=self._ray_params.load_code_from_local, - plasma_directory=self._ray_params.plasma_directory, huge_pages=self._ray_params.huge_pages, fate_share=self.kernel_fate_share, socket_to_use=self.socket, @@ -810,8 +817,17 @@ class Node: logger.debug(f"Process STDOUT and STDERR is being " f"redirected to {self._logs_dir}.") - self.start_plasma_store() - self.start_raylet() + # Make sure we don't call `determine_plasma_store_config` multiple + # times to avoid printing multiple warnings. + resource_spec = self.get_resource_spec() + plasma_directory, object_store_memory = \ + ray.services.determine_plasma_store_config( + resource_spec.object_store_memory, + plasma_directory=self._ray_params.plasma_directory, + huge_pages=self._ray_params.huge_pages + ) + self.start_plasma_store(plasma_directory, object_store_memory) + self.start_raylet(plasma_directory, object_store_memory) if "RAY_USE_NEW_DASHBOARD" not in os.environ: self.start_reporter() diff --git a/python/ray/services.py b/python/ray/services.py index b2b1380a8..4ecd1e3b8 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1256,6 +1256,8 @@ def start_raylet(redis_address, temp_dir, session_dir, resource_spec, + plasma_directory, + object_store_memory, min_worker_port=None, max_worker_port=None, object_manager_port=None, @@ -1270,7 +1272,6 @@ def start_raylet(redis_address, include_java=False, java_worker_options=None, load_code_from_local=False, - plasma_directory=None, huge_pages=False, fate_share=None, socket_to_use=None, @@ -1457,8 +1458,6 @@ def start_raylet(redis_address, subprocess.list2cmdline(agent_command))) if config.get("plasma_store_as_thread"): # command related to the plasma store - plasma_directory, object_store_memory = determine_plasma_store_config( - resource_spec.object_store_memory, plasma_directory, huge_pages) command += [ f"--object_store_memory={object_store_memory}", f"--plasma_directory={plasma_directory}", @@ -1653,8 +1652,8 @@ def determine_plasma_store_config(object_store_memory, "than the total available memory.") else: plasma_directory = os.path.abspath(plasma_directory) - logger.warning("WARNING: object_store_memory is not verified when " - "plasma_directory is set.") + logger.info("object_store_memory is not verified when " + "plasma_directory is set.") if not os.path.isdir(plasma_directory): raise ValueError(f"The file {plasma_directory} does not " @@ -1680,10 +1679,11 @@ def determine_plasma_store_config(object_store_memory, def start_plasma_store(resource_spec, + plasma_directory, + object_store_memory, plasma_store_socket_name, stdout_file=None, stderr_file=None, - plasma_directory=None, keep_idle=False, huge_pages=False, fate_share=None, @@ -1712,8 +1712,6 @@ def start_plasma_store(resource_spec, raise ValueError("Cannot use valgrind and profiler at the same time.") assert resource_spec.resolved() - plasma_directory, object_store_memory = determine_plasma_store_config( - resource_spec.object_store_memory, plasma_directory, huge_pages) command = [ PLASMA_STORE_EXECUTABLE, diff --git a/src/ray/core_worker/task_manager.cc b/src/ray/core_worker/task_manager.cc index 9ed0ce0e0..8d1fc2eeb 100644 --- a/src/ray/core_worker/task_manager.cc +++ b/src/ray/core_worker/task_manager.cc @@ -301,8 +301,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ if (num_retries_left != 0) { auto retries_str = num_retries_left == -1 ? "infinite" : std::to_string(num_retries_left); - RAY_LOG(ERROR) << retries_str << " retries left for task " << spec.TaskId() - << ", attempting to resubmit."; + RAY_LOG(INFO) << retries_str << " retries left for task " << spec.TaskId() + << ", attempting to resubmit."; retry_task_callback_(spec, /*delay=*/true); will_retry = true; } else { @@ -315,8 +315,8 @@ bool TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_ (current_time_ms() - last_log_time_ms_) > kTaskFailureLoggingFrequencyMillis)) { if (num_failure_logs_++ == kTaskFailureThrottlingThreshold) { - RAY_LOG(ERROR) << "Too many failure logs, throttling to once every " - << kTaskFailureLoggingFrequencyMillis << " millis."; + RAY_LOG(WARNING) << "Too many failure logs, throttling to once every " + << kTaskFailureLoggingFrequencyMillis << " millis."; } last_log_time_ms_ = current_time_ms(); if (status != nullptr) { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7d4fa0961..3e5514bf4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2242,11 +2242,11 @@ void NodeManager::MarkObjectsAsFailed( // If we failed to save the error code, log a warning and push an error message // to the driver. std::ostringstream stream; - stream << "An plasma error (" << status.ToString() << ") occurred while saving" + stream << "A plasma error (" << status.ToString() << ") occurred while saving" << " error code to object " << object_id << ". Anyone who's getting this" << " object may hang forever."; std::string error_message = stream.str(); - RAY_LOG(WARNING) << error_message; + RAY_LOG(ERROR) << error_message; auto error_data_ptr = gcs::CreateErrorTableData("task", error_message, current_time_ms(), job_id); RAY_CHECK_OK(gcs_client_->Errors().AsyncReportJobError(error_data_ptr, nullptr)); diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index caf4f67e8..e7ba3374b 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -387,9 +387,15 @@ Process WorkerPool::StartProcess(const std::vector &worker_command_ argv.push_back(NULL); Process child(argv.data(), io_service_, ec, /*decouple=*/false, env); if (!child.IsValid() || ec) { - // The worker failed to start. This is a fatal error. - RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " - << ec.message(); + // errorcode 24: Too many files. This is caused by ulimit. + if (ec.value() == 24) { + RAY_LOG(FATAL) << "Too many workers, failed to create a file. Try setting " + << "`ulimit -n ` then restart Ray."; + } else { + // The worker failed to start. This is a fatal error. + RAY_LOG(FATAL) << "Failed to start worker with return value " << ec << ": " + << ec.message(); + } } return child; }