[Serve] Enable SO_REUSEPORT beyond hasattr check (#22743)

This commit is contained in:
Simon Mo 2022-05-10 11:24:23 -07:00 committed by GitHub
parent c9cd4a75e8
commit 962e839f13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 8 deletions

View file

@ -1,5 +1,6 @@
import asyncio
from asyncio.tasks import FIRST_COMPLETED
import os
import logging
import pickle
import socket
@ -21,6 +22,7 @@ from ray.serve.http_util import (
RawASGIResponse,
receive_http_body,
Response,
set_socket_reuse_port,
)
from ray.serve.common import EndpointInfo, EndpointTag
from ray.serve.constants import SERVE_LOGGER_NAME
@ -32,6 +34,9 @@ logger = logging.getLogger(SERVE_LOGGER_NAME)
MAX_REPLICA_FAILURE_RETRIES = 10
DISCONNECT_ERROR_CODE = "disconnection"
SOCKET_REUSE_PORT_ENABLED = (
os.environ.get("SERVE_SOCKET_REUSE_PORT_ENABLED", "1") == "1"
)
async def _send_request_to_handle(handle, scope, receive, send) -> str:
@ -384,13 +389,8 @@ class HTTPProxyActor:
async def run(self):
sock = socket.socket()
# These two socket options will allow multiple process to bind the the
# same port. Kernel will evenly load balance among the port listeners.
# Note: this will only work on Linux.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if SOCKET_REUSE_PORT_ENABLED:
set_socket_reuse_port(sock)
try:
sock.bind((self.host, self.port))
except OSError:

View file

@ -1,7 +1,9 @@
import asyncio
import socket
from dataclasses import dataclass
import inspect
import json
import logging
from typing import Any, Dict, Type
import starlette.responses
@ -10,6 +12,10 @@ from starlette.types import Send, ASGIApp
from fastapi.encoders import jsonable_encoder
from ray.serve.exceptions import RayServeException
from ray.serve.constants import SERVE_LOGGER_NAME
logger = logging.getLogger(SERVE_LOGGER_NAME)
@dataclass
@ -250,3 +256,32 @@ def make_fastapi_class_based_view(fastapi_app, cls: Type) -> None:
if serve_cls is not None and serve_cls != cls:
routes_to_remove.append(route)
fastapi_app.routes[:] = [r for r in fastapi_app.routes if r not in routes_to_remove]
def set_socket_reuse_port(sock: socket.socket) -> bool:
"""Mutate a socket object to allow multiple process listening on the same port.
Returns:
success(bool): whether the setting was successful.
"""
try:
# These two socket options will allow multiple process to bind the the
# same port. Kernel will evenly load balance among the port listeners.
# Note: this will only work on Linux.
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, "SO_REUSEPORT"):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
# In some Python binary distribution (e.g., conda py3.6), this flag
# was not present at build time but available in runtime. But
# Python relies on compiler flag to include this in binary.
# Therefore, in the absence of socket.SO_REUSEPORT, we try
# to use `15` which is value in linux kernel.
# https://github.com/torvalds/linux/blob/master/tools/include/uapi/asm-generic/socket.h#L27
else:
sock.setsockopt(socket.SOL_SOCKET, 15, 1)
return True
except Exception as e:
logger.debug(
f"Setting SO_REUSEPORT failed because of {e}. SO_REUSEPORT is disabled."
)
return False

View file

@ -12,6 +12,7 @@ from typing import Optional
import pytest
import pydantic
from ray.serve.http_util import set_socket_reuse_port
import requests
import ray
@ -178,8 +179,27 @@ def test_dedicated_cpu(controller_cpu, num_proxy_cpus, ray_cluster):
ray.shutdown()
def test_set_socket_reuse_port():
sock = socket.socket()
if hasattr(socket, "SO_REUSEPORT"):
# If the flag exists, we should be able to to use it
assert set_socket_reuse_port(sock)
elif sys.platform == "linux":
# If the flag doesn't exist, but we are only mordern version
# of linux, we should be able to force set this flag.
assert set_socket_reuse_port(sock)
else:
# Otherwise, it should graceful fail without exception.
assert not set_socket_reuse_port(sock)
def _reuse_port_is_available():
sock = socket.socket()
return set_socket_reuse_port(sock)
@pytest.mark.skipif(
not hasattr(socket, "SO_REUSEPORT"),
not _reuse_port_is_available(),
reason=(
"Port sharing only works on newer verion of Linux. "
"This test can only be ran when port sharing is supported."