[Core][ConcurrencyGroup] Fix blocking task in default group block tasks in other group. (#20525)

Why are these changes needed?
If max concurrency is 1 in default group, a blocking task executing in default group will block the following tasks in different group. See reproduction script in #20475

The issue is due to tasks executing in the default concurrent group run in the main task execution thread, and tasks in other concurrent groups will be blocked if the main task execution thread is blocked.

This PR only changes concurrent actor behavior that default group will not block other groups.

Related issue number
Fix #20475
This commit is contained in:
Qing Wang 2021-11-25 14:24:17 +08:00 committed by GitHub
parent d725457c9f
commit cd2b83a259
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 3 deletions

View file

@ -6,6 +6,7 @@ import io.ray.api.Ray;
import io.ray.api.concurrencygroup.ConcurrencyGroup;
import io.ray.api.concurrencygroup.ConcurrencyGroupBuilder;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.Test;
@ -159,4 +160,33 @@ public class ConcurrencyGroupTest extends BaseTest {
Assert.assertTrue(ret7.get());
Assert.assertTrue(ret8.get());
}
private static class ConcurrencyActor2 {
public String f1() throws InterruptedException {
TimeUnit.MINUTES.sleep(100);
return "never returned";
}
public String f2() {
return "ok";
}
}
/// This case tests that blocking task in default group will block other groups.
/// See https://github.com/ray-project/ray/issues/20475
@Test(groups = {"cluster"})
public void testDefaultCgDoNotBlockOthers() {
ConcurrencyGroup group =
new ConcurrencyGroupBuilder<ConcurrencyActor2>()
.setName("group")
.setMaxConcurrency(1)
.addMethod(ConcurrencyActor2::f2)
.build();
ActorHandle<ConcurrencyActor2> myActor =
Ray.actor(ConcurrencyActor2::new).setConcurrencyGroups(group).remote();
myActor.task(ConcurrencyActor2::f1).remote();
Assert.assertEquals(myActor.task(ConcurrencyActor2::f2).remote().get(), "ok");
}
}

View file

@ -56,9 +56,12 @@ PoolManager::PoolManager(const std::vector<ConcurrencyGroup> &concurrency_groups
}
name_to_thread_pool_index_[name] = pool;
}
// If max concurrency of default group is 1, the tasks of default group
// will be performed in main thread instead of any executor pool.
if (default_group_max_concurrency > 1) {
// If max concurrency of default group is 1 and there is no other concurrency group of
// this actor, the tasks of default group will be performed in main thread instead of
// any executor pool, otherwise tasks in any concurrency group should be performed in
// the thread pools instead of main thread.
if (default_group_max_concurrency > 1 || !concurrency_groups.empty()) {
/// The concurrency group is enabled.
default_thread_pool_ =
std::make_shared<BoundedExecutor>(default_group_max_concurrency);
}