From 962e839f136f5dfed29aefc5769e1723d7919eb7 Mon Sep 17 00:00:00 2001 From: Simon Mo Date: Tue, 10 May 2022 11:24:23 -0700 Subject: [PATCH] [Serve] Enable SO_REUSEPORT beyond hasattr check (#22743) --- python/ray/serve/http_proxy.py | 14 ++++----- python/ray/serve/http_util.py | 35 +++++++++++++++++++++++ python/ray/serve/tests/test_standalone.py | 22 +++++++++++++- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/python/ray/serve/http_proxy.py b/python/ray/serve/http_proxy.py index 171dd131c..b15a25d37 100644 --- a/python/ray/serve/http_proxy.py +++ b/python/ray/serve/http_proxy.py @@ -1,5 +1,6 @@ import asyncio from asyncio.tasks import FIRST_COMPLETED +import os import logging import pickle import socket @@ -21,6 +22,7 @@ from ray.serve.http_util import ( RawASGIResponse, receive_http_body, Response, + set_socket_reuse_port, ) from ray.serve.common import EndpointInfo, EndpointTag from ray.serve.constants import SERVE_LOGGER_NAME @@ -32,6 +34,9 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) MAX_REPLICA_FAILURE_RETRIES = 10 DISCONNECT_ERROR_CODE = "disconnection" +SOCKET_REUSE_PORT_ENABLED = ( + os.environ.get("SERVE_SOCKET_REUSE_PORT_ENABLED", "1") == "1" +) async def _send_request_to_handle(handle, scope, receive, send) -> str: @@ -384,13 +389,8 @@ class HTTPProxyActor: async def run(self): sock = socket.socket() - # These two socket options will allow multiple process to bind the the - # same port. Kernel will evenly load balance among the port listeners. - # Note: this will only work on Linux. - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - if hasattr(socket, "SO_REUSEPORT"): - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - + if SOCKET_REUSE_PORT_ENABLED: + set_socket_reuse_port(sock) try: sock.bind((self.host, self.port)) except OSError: diff --git a/python/ray/serve/http_util.py b/python/ray/serve/http_util.py index 4c69a7945..795e30d12 100644 --- a/python/ray/serve/http_util.py +++ b/python/ray/serve/http_util.py @@ -1,7 +1,9 @@ import asyncio +import socket from dataclasses import dataclass import inspect import json +import logging from typing import Any, Dict, Type import starlette.responses @@ -10,6 +12,10 @@ from starlette.types import Send, ASGIApp from fastapi.encoders import jsonable_encoder from ray.serve.exceptions import RayServeException +from ray.serve.constants import SERVE_LOGGER_NAME + + +logger = logging.getLogger(SERVE_LOGGER_NAME) @dataclass @@ -250,3 +256,32 @@ def make_fastapi_class_based_view(fastapi_app, cls: Type) -> None: if serve_cls is not None and serve_cls != cls: routes_to_remove.append(route) fastapi_app.routes[:] = [r for r in fastapi_app.routes if r not in routes_to_remove] + + +def set_socket_reuse_port(sock: socket.socket) -> bool: + """Mutate a socket object to allow multiple process listening on the same port. + + Returns: + success(bool): whether the setting was successful. + """ + try: + # These two socket options will allow multiple process to bind the the + # same port. Kernel will evenly load balance among the port listeners. + # Note: this will only work on Linux. + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if hasattr(socket, "SO_REUSEPORT"): + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + # In some Python binary distribution (e.g., conda py3.6), this flag + # was not present at build time but available in runtime. But + # Python relies on compiler flag to include this in binary. + # Therefore, in the absence of socket.SO_REUSEPORT, we try + # to use `15` which is value in linux kernel. + # https://github.com/torvalds/linux/blob/master/tools/include/uapi/asm-generic/socket.h#L27 + else: + sock.setsockopt(socket.SOL_SOCKET, 15, 1) + return True + except Exception as e: + logger.debug( + f"Setting SO_REUSEPORT failed because of {e}. SO_REUSEPORT is disabled." + ) + return False diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 6581e0e91..20357ed93 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -12,6 +12,7 @@ from typing import Optional import pytest import pydantic +from ray.serve.http_util import set_socket_reuse_port import requests import ray @@ -178,8 +179,27 @@ def test_dedicated_cpu(controller_cpu, num_proxy_cpus, ray_cluster): ray.shutdown() +def test_set_socket_reuse_port(): + sock = socket.socket() + if hasattr(socket, "SO_REUSEPORT"): + # If the flag exists, we should be able to to use it + assert set_socket_reuse_port(sock) + elif sys.platform == "linux": + # If the flag doesn't exist, but we are only mordern version + # of linux, we should be able to force set this flag. + assert set_socket_reuse_port(sock) + else: + # Otherwise, it should graceful fail without exception. + assert not set_socket_reuse_port(sock) + + +def _reuse_port_is_available(): + sock = socket.socket() + return set_socket_reuse_port(sock) + + @pytest.mark.skipif( - not hasattr(socket, "SO_REUSEPORT"), + not _reuse_port_is_available(), reason=( "Port sharing only works on newer verion of Linux. " "This test can only be ran when port sharing is supported."