## Why are these changes needed?
This PR aims to port concurrency groups functionality with asyncio for Python.
### API
```python
@ray.remote(concurrency_groups={"io": 2, "compute": 4})
class AsyncActor:
def __init__(self):
pass
@ray.method(concurrency_group="io")
async def f1(self):
pass
@ray.method(concurrency_group="io")
def f2(self):
pass
@ray.method(concurrency_group="compute")
def f3(self):
pass
@ray.method(concurrency_group="compute")
def f4(self):
pass
def f5(self):
pass
```
The annotation above the actor class `AsyncActor` defines this actor will have 2 concurrency groups and defines their max concurrencies, and it has a default concurrency group. Every concurrency group has an async eventloop and a pythread to execute the methods which is defined on them.
Method `f1` will be invoked in the `io` concurrency group. `f2` in `io`, `f3` in `compute` and etc.
TO BE NOTICED, `f5` and `__init__` will be invoked in the default concurrency.
The following method `f2` will be invoked in the concurrency group `compute` since the dynamic specifying has a higher priority.
```python
a.f2.options(concurrency_group="compute").remote()
```
### Implementation
The straightforward implementation details are:
- Before we only have 1 eventloop binding 1 pythread for an asyncio actor. Now we create 1 eventloop binding 1 pythread for every concurrency group of the asyncio actor.
- Before we have 1 fiber state for every caller in the asyncio actor. Now we create a FiberStateManager for every caller in the asyncio actor. And the FiberStateManager manages the fiber states for concurrency groups.
## Related issue number
#16047
Why are these changes needed?
Right now the failure signal handler registered in Python worker is skipped on crashes like segfault, because C++ core worker overrides the failure signal handler here and does not call the previously registered handler. This prevents Python stack trace from being printed on crashes. The fix is to make the C++ fault signal handler to call the previous signal handler registered in Python. For example with the script below which segfaults,
import ray
ray.init()
@ray.remote
def f():
import ctypes;
ctypes.string_at(0)
ray.get(f.remote())
Ray currently only prints the following stack trace:
(pid=26693) *** SIGSEGV received at time=1634418743 ***
(pid=26693) PC: @ 0x7fff203d9552 (unknown) _platform_strlen
(pid=26693) [2021-10-16 14:12:23,331 E 26693 12194577] logging.cc:313: *** SIGSEGV received at time=1634418743 ***
(pid=26693) [2021-10-16 14:12:23,331 E 26693 12194577] logging.cc:313: PC: @ 0x7fff203d9552 (unknown) _platform_strlen
With this change, Python stack trace will be printed in addition to the stack trace above:
(pid=26693) Fatal Python error: Segmentation fault
(pid=26693)
(pid=26693) Stack (most recent call first):
(pid=26693) File "/Users/mwtian/opt/anaconda3/envs/ray/lib/python3.7/ctypes/__init__.py", line 505 in string_at
(pid=26693) File "stack.py", line 7 in f
(pid=26693) File "/Users/mwtian/work/ray-project/ray/python/ray/worker.py", line 425 in main_loop
(pid=26693) File "/Users/mwtian/work/ray-project/ray/python/ray/workers/default_worker.py", line 212 in <module>
This should make debugging crashes in Python worker easier, for users and Ray devs.
Also, try to initialize symbolizer in GCS, Raylet and core worker. This is a no-op on MacOS and some Linux environments (e.g. Ray on Ubuntu 20.04 already produces symbolized stack traces), but should make Ray more likely to have symbolized stack traces on other platforms.
Why are these changes needed?
Related issue number
##19177
Quoting #19177 (comment) here,
The following tests fail when not skipped,
=================================== short test summary info ====================================
FAILED python\ray\tests\test_basic.py::test_user_setup_function - subprocess.CalledProcessErro...
FAILED python\ray\tests\test_basic.py::test_disable_cuda_devices - subprocess.CalledProcessErr...
FAILED python\ray\tests\test_basic.py::test_wait_timing - assert (1634209333.6099107 - 1634209...
Results (395.22s):
36 passed
3 failed
- ray\tests/test_basic.py:197 test_user_setup_function
- ray\tests/test_basic.py:220 test_disable_cuda_devices
- ray\tests/test_basic.py:265 test_wait_timing
=================================== short test summary info ====================================
FAILED python\ray\tests\test_basic_3.py::test_fair_queueing - AssertionError: 23
Results (198.33s):
1 failed
- ray\tests/test_basic_3.py:169 test_fair_queueing
The following test passed when not skipped. Opening a PR to verify that.
def test_oversized_function(ray_start_shared_local_modes)