[streaming] Use enum to define resource type. (#7813)

This commit is contained in:
Tianyi Chen 2020-03-31 00:03:49 +08:00 committed by GitHub
parent eb61036ba2
commit f889f938e5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 20 deletions

View file

@ -7,16 +7,6 @@ import org.ray.streaming.runtime.config.Config;
*/
public interface ResourceConfig extends Config {
/**
* CPU definition key of resource management.
*/
String RESOURCE_KEY_CPU = "CPU";
/**
* Memory definition key of resource management.
*/
String RESOURCE_KEY_MEM = "MEM";
/**
* Number of actors per container.
*/

View file

@ -9,6 +9,7 @@ import java.util.Map;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.resource.ResourceType;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.worker.JobWorker;
@ -138,10 +139,10 @@ public class ExecutionVertex implements Serializable {
Map<String, Double> resourceMap = new HashMap<>();
ResourceConfig resourceConfig = runtimeContext.getConf().masterConfig.resourceConfig;
if (resourceConfig.isTaskCpuResourceLimit()) {
resourceMap.put(ResourceConfig.RESOURCE_KEY_CPU, resourceConfig.taskCpuResource());
resourceMap.put(ResourceType.CPU.name(), resourceConfig.taskCpuResource());
}
if (resourceConfig.isTaskMemResourceLimit()) {
resourceMap.put(ResourceConfig.RESOURCE_KEY_MEM, resourceConfig.taskMemResource());
resourceMap.put(ResourceType.MEM.name(), resourceConfig.taskMemResource());
}
return resourceMap;
}

View file

@ -0,0 +1,33 @@
package org.ray.streaming.runtime.core.resource;
/**
* Key for different type of resources.
*/
public enum ResourceType {
/**
*Cpu resource key.
*/
CPU("CPU"),
/**
*Gpu resource key.
*/
GPU("GPU"),
/**
* Memory resource key.
*/
MEM("MEM");
private String value;
ResourceType(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}

View file

@ -13,6 +13,7 @@ import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.config.global.CommonConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.resource.ResourceType;
import org.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import org.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl;
import org.ray.streaming.runtime.master.scheduler.strategy.SlotAssignStrategy;
@ -61,8 +62,8 @@ public class ResourceManagerTest extends BaseUnitTest {
Assert.assertTrue(slotAssignStrategy instanceof PipelineFirstStrategy);
Map<String, Double> containerResource = new HashMap<>();
containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0);
containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0);
containerResource.put(ResourceType.CPU.name(), 16.0);
containerResource.put(ResourceType.MEM.name(), 128.0);
Container container1 = new Container(null, "testAddress1", "testHostName1");
container1.setAvailableResource(containerResource);
Container container2 = new Container(null, "testAddress2", "testHostName2");
@ -91,9 +92,9 @@ public class ResourceManagerTest extends BaseUnitTest {
Map<String, Double> resource = resourceManager.allocateResource(container, vertex.getResources());
Assert.assertNotNull(resource);
});
Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceType.CPU.name()), 14.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceType.CPU.name()), 14.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceType.MEM.name()), 118.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceType.MEM.name()), 118.0);
}
}

View file

@ -14,6 +14,7 @@ import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.resource.ResourceType;
import org.ray.streaming.runtime.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.master.scheduler.strategy.impl.PipelineFirstStrategy;
import org.ray.streaming.runtime.core.resource.Container;
@ -48,8 +49,8 @@ public class PipelineFirstStrategyTest extends BaseUnitTest {
Resources resources = new Resources(resourceConfig);
Map<String, Double> containerResource = new HashMap<>();
containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0);
containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0);
containerResource.put(ResourceType.CPU.name(), 16.0);
containerResource.put(ResourceType.MEM.name(), 128.0);
for (int i = 0; i < 2; ++i) {
UniqueId uniqueId = UniqueId.randomId();
Container container = new Container(uniqueId, "1.1.1." + i, "localhost" + i);