mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Fix crash caused by unit test testWaitAndCrash
(#6549)
This commit is contained in:
parent
e556b729c2
commit
af9f76359a
3 changed files with 44 additions and 23 deletions
|
@ -141,6 +141,11 @@ public final class RayNativeRuntime extends AbstractRayRuntime {
|
|||
LOGGER.info("RayNativeRuntime shutdown");
|
||||
}
|
||||
|
||||
// For test purpose only
|
||||
public RunManager getRunManager() {
|
||||
return manager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(String resourceName, double capacity, UniqueId nodeId) {
|
||||
Preconditions.checkArgument(Double.compare(capacity, 0) >= 0);
|
||||
|
|
|
@ -57,30 +57,39 @@ public class RunManager {
|
|||
|
||||
for (int i = processes.size() - 1; i >= 0; --i) {
|
||||
Pair<String, Process> pair = processes.get(i);
|
||||
String name = pair.getLeft();
|
||||
Process p = pair.getRight();
|
||||
|
||||
int numAttempts = 0;
|
||||
while (p.isAlive()) {
|
||||
if (numAttempts == 0) {
|
||||
LOGGER.debug("Terminating process {}.", name);
|
||||
p.destroy();
|
||||
} else {
|
||||
LOGGER.debug("Terminating process {} forcibly.", name);
|
||||
p.destroyForcibly();
|
||||
}
|
||||
try {
|
||||
p.waitFor(KILL_PROCESS_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Got InterruptedException while waiting for process {}" +
|
||||
" to be terminated.", processes.get(i));
|
||||
}
|
||||
numAttempts++;
|
||||
}
|
||||
LOGGER.info("Process {} is now terminated.", name);
|
||||
terminateProcess(pair.getLeft(), pair.getRight());
|
||||
}
|
||||
}
|
||||
|
||||
public void terminateProcess(String name, Process p) {
|
||||
int numAttempts = 0;
|
||||
while (p.isAlive()) {
|
||||
if (numAttempts == 0) {
|
||||
LOGGER.debug("Terminating process {}.", name);
|
||||
p.destroy();
|
||||
} else {
|
||||
LOGGER.debug("Terminating process {} forcibly.", name);
|
||||
p.destroyForcibly();
|
||||
}
|
||||
try {
|
||||
p.waitFor(KILL_PROCESS_WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Got InterruptedException while waiting for process {}" +
|
||||
" to be terminated.", name);
|
||||
}
|
||||
numAttempts++;
|
||||
}
|
||||
LOGGER.info("Process {} is now terminated.", name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get processes by name. For test purposes only.
|
||||
*/
|
||||
public List<Process> getProcesses(String name) {
|
||||
return processes.stream().filter(pair -> pair.getLeft().equals(name)).map(Pair::getRight)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private void createTempDirs() {
|
||||
try {
|
||||
FileUtils.forceMkdir(new File(rayConfig.logDir));
|
||||
|
@ -258,7 +267,8 @@ public class RunManager {
|
|||
String.format("--raylet_socket_name=%s", rayConfig.rayletSocketName),
|
||||
String.format("--store_socket_name=%s", rayConfig.objectStoreSocketName),
|
||||
String.format("--object_manager_port=%d", 0), // The object manager port.
|
||||
String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()), // The node manager port.
|
||||
// The node manager port.
|
||||
String.format("--node_manager_port=%d", rayConfig.getNodeManagerPort()),
|
||||
String.format("--node_ip_address=%s", rayConfig.nodeIp),
|
||||
String.format("--redis_address=%s", rayConfig.getRedisIp()),
|
||||
String.format("--redis_port=%d", rayConfig.getRedisPort()),
|
||||
|
|
|
@ -7,7 +7,9 @@ import org.ray.api.RayObject;
|
|||
import org.ray.api.TestUtils;
|
||||
import org.ray.api.exception.RayException;
|
||||
import org.ray.api.id.ObjectId;
|
||||
import org.ray.runtime.RayNativeRuntime;
|
||||
import org.ray.runtime.object.RayObjectImpl;
|
||||
import org.ray.runtime.runner.RunManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testng.Assert;
|
||||
|
@ -26,7 +28,11 @@ public class ClientExceptionTest extends BaseTest {
|
|||
Thread thread = new Thread(() -> {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
Ray.shutdown();
|
||||
// kill raylet
|
||||
RunManager runManager = ((RayNativeRuntime) TestUtils.getRuntime()).getRunManager();
|
||||
for (Process process : runManager.getProcesses("raylet")) {
|
||||
runManager.terminateProcess("raylet", process);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("Got InterruptedException when sleeping, exit right now.");
|
||||
throw new RuntimeException("Got InterruptedException when sleeping.", e);
|
||||
|
|
Loading…
Add table
Reference in a new issue