Streaming scheduler resourcemanager (#7070)

This commit is contained in:
JianZhangYang 2020-02-24 19:32:50 +08:00 committed by GitHub
parent 0ae4fe020d
commit 7e115490d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1319 additions and 6 deletions

View file

@ -1,6 +1,8 @@
package org.ray.streaming.runtime.config;
import java.util.Map;
import org.aeonbits.owner.ConfigFactory;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -11,7 +13,10 @@ public class StreamingMasterConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class);
public ResourceConfig resourceConfig;
public StreamingMasterConfig(final Map<String, String> conf) {
super(conf);
this.resourceConfig = ConfigFactory.create(ResourceConfig.class, conf);
}
}

View file

@ -0,0 +1,92 @@
package org.ray.streaming.runtime.config.master;
import org.ray.streaming.runtime.config.Config;
/**
* Job resource management config.
*/
public interface ResourceConfig extends Config {
/**
* CPU definition key of resource management.
*/
String RESOURCE_KEY_CPU = "CPU";
/**
* Memory definition key of resource management.
*/
String RESOURCE_KEY_MEM = "MEM";
/**
* Number of actors per container.
*/
String MAX_ACTOR_NUM_PER_CONTAINER = "streaming.container.per.max.actor";
/**
* The interval between detecting ray cluster nodes.
*/
String CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND = "streaming.resource.check.interval.second";
/**
* CPU use by per task.
*/
String TASK_RESOURCE_CPU = "streaming.task.resource.cpu";
/**
* Memory use by each task
*/
String TASK_RESOURCE_MEM = "streaming.task.resource.mem";
/**
* Whether to enable CPU limit in resource control.
*/
String TASK_RESOURCE_CPU_LIMIT_ENABLE = "streaming.task.resource.cpu.limitation.enable";
/**
* Whether to enable memory limit in resource control.
*/
String TASK_RESOURCE_MEM_LIMIT_ENABLE = "streaming.task.resource.mem.limitation.enable";
/**
* Number of cpu per task.
*/
@DefaultValue(value = "1.0")
@Key(value = TASK_RESOURCE_CPU)
double taskCpuResource();
/**
* Memory size used by each task.
*/
@DefaultValue(value = "2.0")
@Key(value = TASK_RESOURCE_MEM)
double taskMemResource();
/**
* Whether to enable CPU limit in resource control.
*/
@DefaultValue(value = "true")
@Key(value = TASK_RESOURCE_CPU_LIMIT_ENABLE)
boolean isTaskCpuResourceLimit();
/**
* Whether to enable memory limit in resource control.
*/
@DefaultValue(value = "true")
@Key(value = TASK_RESOURCE_MEM_LIMIT_ENABLE)
boolean isTaskMemResourceLimit();
/**
* Number of actors per container.
*/
@DefaultValue(value = "500")
@Key(MAX_ACTOR_NUM_PER_CONTAINER)
int maxActorNumPerContainer();
/**
* The interval between detecting ray cluster nodes.
*/
@DefaultValue(value = "1")
@Key(value = CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND)
long resourceCheckIntervalSecond();
}

View file

@ -0,0 +1,21 @@
package org.ray.streaming.runtime.config.types;
public enum SlotAssignStrategyType {
/**
* Resource scheduling strategy based on FF(First Fit) algorithm and pipeline.
*/
PIPELINE_FIRST_STRATEGY("pipeline_first_strategy", 0);
private String value;
private int index;
SlotAssignStrategyType(String value, int index) {
this.value = value;
this.index = index;
}
public String getValue() {
return value;
}
}

View file

@ -0,0 +1,123 @@
package org.ray.streaming.runtime.core.common;
import java.io.Serializable;
import java.util.Random;
/**
* Streaming system unique identity base class.
* For example, ${@link org.ray.streaming.runtime.core.resource.ContainerID }
*/
public class AbstractID implements Comparable<AbstractID>, Serializable {
private static final long serialVersionUID = 1L;
private static final Random RANDOM = new Random();
private static final int SIZE_OF_LONG = 8;
private static final int SIZE_OF_UPPER_PART = 8;
private static final int SIZE_OF_LOWER_PART = 8;
//lowerPart(long type) + upperPart(long type)
public static final int SIZE = SIZE_OF_UPPER_PART + SIZE_OF_LOWER_PART;
protected final long upperPart;
protected final long lowerPart;
private String toString;
public AbstractID(byte[] bytes) {
if (bytes == null || bytes.length != SIZE) {
throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes");
}
this.lowerPart = byteArrayToLong(bytes, 0);
this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG);
}
public AbstractID(long lowerPart, long upperPart) {
this.lowerPart = lowerPart;
this.upperPart = upperPart;
}
public AbstractID(AbstractID id) {
if (id == null) {
throw new IllegalArgumentException("Id must not be null.");
}
this.lowerPart = id.lowerPart;
this.upperPart = id.upperPart;
}
public AbstractID() {
this.lowerPart = RANDOM.nextLong();
this.upperPart = RANDOM.nextLong();
}
public long getLowerPart() {
return lowerPart;
}
public long getUpperPart() {
return upperPart;
}
public byte[] getBytes() {
byte[] bytes = new byte[SIZE];
longToByteArray(lowerPart, bytes, 0);
longToByteArray(upperPart, bytes, SIZE_OF_LONG);
return bytes;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (obj != null && obj.getClass() == getClass()) {
AbstractID that = (AbstractID) obj;
return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart;
} else {
return false;
}
}
@Override
public int hashCode() {
return ((int) this.lowerPart) ^
((int) (this.lowerPart >>> 32)) ^
((int) this.upperPart) ^
((int) (this.upperPart >>> 32));
}
@Override
public String toString() {
if (this.toString == null) {
final byte[] ba = new byte[SIZE];
longToByteArray(this.lowerPart, ba, 0);
longToByteArray(this.upperPart, ba, SIZE_OF_LONG);
this.toString = new String(ba);
}
return this.toString;
}
@Override
public int compareTo(AbstractID abstractID) {
int diff1 = Long.compare(this.upperPart, abstractID.upperPart);
int diff2 = Long.compare(this.lowerPart, abstractID.lowerPart);
return diff1 == 0 ? diff2 : diff1;
}
private static long byteArrayToLong(byte[] begin, int offset) {
long longNum = 0;
for (int i = 0; i < SIZE_OF_LONG; ++i) {
longNum |= (begin[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3);
}
return longNum;
}
private static void longToByteArray(long longNum, byte[] byteArray, int offset) {
for (int i = 0; i < SIZE_OF_LONG; ++i) {
final int shift = i << 3; // i * 8
byteArray[offset + SIZE_OF_LONG - 1 - i] = (byte) ((longNum & (0xffL << shift)) >>> shift);
}
}
}

View file

@ -58,7 +58,6 @@ public class ExecutionEdge implements Serializable {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("srcVertex", sourceVertex)
.add("targetVertex", targetVertex)
.add("executionEdgeIndex", executionEdgeIndex)
.toString();
}

View file

@ -10,6 +10,7 @@ import org.ray.api.RayActor;
import org.ray.streaming.jobgraph.JobVertex;
import org.ray.streaming.jobgraph.VertexType;
import org.ray.streaming.operator.StreamOperator;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.worker.JobWorker;
/**
@ -34,15 +35,18 @@ public class ExecutionJobVertex {
private int parallelism;
private List<ExecutionVertex> executionVertices;
private JobRuntimeContext runtimeContext;
private List<ExecutionJobEdge> inputEdges = new ArrayList<>();
private List<ExecutionJobEdge> outputEdges = new ArrayList<>();
public ExecutionJobVertex(JobVertex jobVertex) {
public ExecutionJobVertex(JobVertex jobVertex, JobRuntimeContext runtimeContext) {
this.jobVertexId = jobVertex.getVertexId();
this.jobVertexName = generateVertexName(jobVertexId, jobVertex.getStreamOperator());
this.streamOperator = jobVertex.getStreamOperator();
this.vertexType = jobVertex.getVertexType();
this.parallelism = jobVertex.getParallelism();
this.runtimeContext = runtimeContext;
this.executionVertices = createExecutionVertics();
}
@ -133,6 +137,10 @@ public class ExecutionJobVertex {
return getVertexType() == VertexType.SINK;
}
public JobRuntimeContext getRuntimeContext() {
return runtimeContext;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)

View file

@ -3,9 +3,14 @@ package org.ray.streaming.runtime.core.graph.executiongraph;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.worker.JobWorker;
/**
@ -27,8 +32,13 @@ public class ExecutionVertex implements Serializable {
* Unique name generated by vertex name and index for execution vertex.
*/
private final String vertexName;
/**
* Resources used by ExecutionVertex.
*/
private final Map<String, Double> resources;
private ExecutionVertexState state = ExecutionVertexState.TO_ADD;
private Slot slot;
private RayActor<JobWorker> workerActor;
private List<ExecutionEdge> inputEdges = new ArrayList<>();
private List<ExecutionEdge> outputEdges = new ArrayList<>();
@ -37,6 +47,7 @@ public class ExecutionVertex implements Serializable {
this.vertexId = generateExecutionVertexId(jobVertexId, index);
this.vertexIndex = index;
this.vertexName = executionJobVertex.getJobVertexName() + "-" + vertexIndex;
this.resources = generateResources(executionJobVertex.getRuntimeContext());
}
private int generateExecutionVertexId(int jobVertexId, int index) {
@ -105,16 +116,46 @@ public class ExecutionVertex implements Serializable {
return vertexName;
}
public Map<String, Double> getResources() {
return resources;
}
public Slot getSlot() {
return slot;
}
public void setSlot(Slot slot) {
this.slot = slot;
}
public void setSlotIfNotExist(Slot slot) {
if (null == this.slot) {
this.slot = slot;
}
}
private Map<String, Double> generateResources(JobRuntimeContext runtimeContext) {
Map<String, Double> resourceMap = new HashMap<>();
ResourceConfig resourceConfig = runtimeContext.getConf().masterConfig.resourceConfig;
if (resourceConfig.isTaskCpuResourceLimit()) {
resourceMap.put(ResourceConfig.RESOURCE_KEY_CPU, resourceConfig.taskCpuResource());
}
if (resourceConfig.isTaskMemResourceLimit()) {
resourceMap.put(ResourceConfig.RESOURCE_KEY_MEM, resourceConfig.taskMemResource());
}
return resourceMap;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("vertexId", vertexId)
.add("vertexIndex", vertexIndex)
.add("vertexName", vertexName)
.add("resources", resources)
.add("state", state)
.add("slot", slot)
.add("workerActor", workerActor)
.add("inputEdges", inputEdges)
.add("outputEdges", outputEdges)
.toString();
}
}

View file

@ -0,0 +1,53 @@
package org.ray.streaming.runtime.core.master.resourcemanager;
import java.util.List;
import java.util.Map;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.Resources;
/**
* The resource manager is responsible for resource de-/allocation and monitoring ray cluster.
*/
public interface ResourceManager {
/**
* Get all registered container as a list.
*
* @return A list of containers.
*/
List<Container> getRegisteredContainers();
/**
* Allocate resource to actor.
*
* @param container Specify the container to be allocated.
* @param requireResource Resource size to be requested.
* @return Allocated resource.
*/
Map<String, Double> allocateResource(final Container container,
final Map<String, Double> requireResource);
/**
* Deallocate resource to actor.
*
* @param container Specify the container to be deallocate.
* @param releaseResource Resource to be released.
*/
void deallocateResource(final Container container,
final Map<String, Double> releaseResource);
/**
* Get the current slot-assign strategy from manager.
*
* @return Current slot-assign strategy.
*/
SlotAssignStrategy getSlotAssignStrategy();
/**
* Get resources from manager.
*
* @return Current resources in manager.
*/
Resources getResources();
}

View file

@ -0,0 +1,200 @@
package org.ray.streaming.runtime.core.master.resourcemanager;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.ray.api.Ray;
import org.ray.api.runtimecontext.NodeInfo;
import org.ray.streaming.runtime.config.StreamingMasterConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.config.types.SlotAssignStrategyType;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategyFactory;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ResourceManagerImpl implements ResourceManager {
private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerImpl.class);
//Container used tag
private static final String CONTAINER_ENGAGED_KEY = "CONTAINER_ENGAGED_KEY";
/**
* Job runtime context.
*/
private JobRuntimeContext runtimeContext;
/**
* Resource related configuration.
*/
private ResourceConfig resourceConfig;
/**
* Slot assign strategy.
*/
private SlotAssignStrategy slotAssignStrategy;
/**
* Resource description information.
*/
private final Resources resources;
private final ScheduledExecutorService scheduledExecutorService;
public ResourceManagerImpl(JobRuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
StreamingMasterConfig masterConfig = runtimeContext.getConf().masterConfig;
this.resourceConfig = masterConfig.resourceConfig;
this.resources = new Resources(resourceConfig);
LOG.info("ResourceManagerImpl begin init, conf is {}, resources are {}.",
resourceConfig, resources);
SlotAssignStrategyType slotAssignStrategyType = SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY;
this.slotAssignStrategy = SlotAssignStrategyFactory.getStrategy(slotAssignStrategyType);
this.slotAssignStrategy.setResources(resources);
LOG.info("Slot assign strategy: {}.", slotAssignStrategy.getName());
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
long intervalSecond = resourceConfig.resourceCheckIntervalSecond();
this.scheduledExecutorService.scheduleAtFixedRate(
Ray.wrapRunnable(this::checkAndUpdateResources), 0, intervalSecond, TimeUnit.SECONDS);
LOG.info("ResourceManagerImpl init success.");
}
@Override
public Map<String, Double> allocateResource(final Container container,
final Map<String, Double> requireResource) {
LOG.info("Start to allocate resource for actor with container: {}.", container);
// allocate resource to actor
Map<String, Double> resources = new HashMap<>();
Map<String, Double> containResource = container.getAvailableResource();
for (Map.Entry<String, Double> entry : containResource.entrySet()) {
if (requireResource.containsKey(entry.getKey())) {
double availableResource = entry.getValue() - requireResource.get(entry.getKey());
entry.setValue(availableResource);
resources.put(entry.getKey(), requireResource.get(entry.getKey()));
}
}
LOG.info("Allocate resource: {} to container {}.", requireResource, container);
return resources;
}
@Override
public void deallocateResource(final Container container,
final Map<String, Double> releaseResource) {
LOG.info("Deallocating resource for container {}.", container);
Map<String, Double> containResource = container.getAvailableResource();
for (Map.Entry<String, Double> entry : containResource.entrySet()) {
if (releaseResource.containsKey(entry.getKey())) {
double availableResource = entry.getValue() + releaseResource.get(entry.getKey());
LOG.info("Release source {}:{}", entry.getKey(), releaseResource.get(entry.getKey()));
entry.setValue(availableResource);
}
}
LOG.info("Deallocated resource for container {} success.", container);
}
@Override
public List<Container> getRegisteredContainers() {
return new ArrayList<>(resources.getRegisterContainers());
}
@Override
public SlotAssignStrategy getSlotAssignStrategy() {
return slotAssignStrategy;
}
@Override
public Resources getResources() {
return this.resources;
}
/**
* Check the status of ray cluster node and update the internal resource information of
* streaming system.
*/
private void checkAndUpdateResources() {
// get all started nodes
List<NodeInfo> latestNodeInfos = Ray.getRuntimeContext().getAllNodeInfo();
List<NodeInfo> addNodes = latestNodeInfos.stream().filter(nodeInfo -> {
for (Container container : resources.getRegisterContainers()) {
if (container.getNodeId().equals(nodeInfo.nodeId)) {
return false;
}
}
return true;
}).collect(Collectors.toList());
List<Container> deleteContainers = resources.getRegisterContainers().stream()
.filter(container -> {
for (NodeInfo nodeInfo : latestNodeInfos) {
if (nodeInfo.nodeId.equals(container.getNodeId())) {
return false;
}
}
return true;
}).collect(Collectors.toList());
LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.",
latestNodeInfos, resources.getRegisterContainers(), addNodes, deleteContainers);
//Register new nodes.
if (!addNodes.isEmpty()) {
for (NodeInfo node : addNodes) {
registerContainer(node);
}
}
//Clear deleted nodes
if (!deleteContainers.isEmpty()) {
for (Container container : deleteContainers) {
unregisterContainer(container);
}
}
}
private void registerContainer(final NodeInfo nodeInfo) {
LOG.info("Register container {}.", nodeInfo);
Container container =
new Container(nodeInfo.nodeId, nodeInfo.nodeAddress, nodeInfo.nodeHostname);
container.setAvailableResource(nodeInfo.resources);
//Create ray resource.
Ray.setResource(container.getNodeId(),
container.getName(),
resources.getMaxActorNumPerContainer());
//Mark container is already registered.
Ray.setResource(container.getNodeId(),
CONTAINER_ENGAGED_KEY, 1);
// update register container list
resources.getRegisterContainers().add(container);
}
private void unregisterContainer(final Container container) {
LOG.info("Unregister container {}.", container);
// delete resource with capacity to 0
Ray.setResource(container.getNodeId(), container.getName(), 0);
Ray.setResource(container.getNodeId(), CONTAINER_ENGAGED_KEY, 0);
// remove from container map
resources.getRegisterContainers().remove(container);
}
}

View file

@ -0,0 +1,42 @@
package org.ray.streaming.runtime.core.master.scheduler.strategy;
import java.util.List;
import java.util.Map;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.ContainerID;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.core.resource.Slot;
/**
* The SlotAssignStrategy managers a set of slots. When a container is
* registered to ResourceManager, slots are assigned to it.
*/
public interface SlotAssignStrategy {
/**
* Calculate slot number per container and set to resources.
*/
int getSlotNumPerContainer(List<Container> containers, int maxParallelism);
/**
* Allocate slot to container
*/
void allocateSlot(final List<Container> containers, final int slotNumPerContainer);
/**
* Assign slot to execution vertex
*/
Map<ContainerID, List<Slot>> assignSlot(ExecutionGraph executionGraph);
/**
* Get slot assign strategy name
*/
String getName();
/**
* Set resources.
*/
void setResources(Resources resources);
}

View file

@ -0,0 +1,24 @@
package org.ray.streaming.runtime.core.master.scheduler.strategy;
import org.ray.streaming.runtime.config.types.SlotAssignStrategyType;
import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SlotAssignStrategyFactory {
private static final Logger LOG = LoggerFactory.getLogger(SlotAssignStrategyFactory.class);
public static SlotAssignStrategy getStrategy(final SlotAssignStrategyType type) {
SlotAssignStrategy strategy = null;
LOG.info("Slot assign strategy is: {}.", type);
switch (type) {
case PIPELINE_FIRST_STRATEGY:
strategy = new PipelineFirstStrategy();
break;
default:
throw new RuntimeException("strategy config error, no impl found for " + strategy);
}
return strategy;
}
}

View file

@ -0,0 +1,205 @@
package org.ray.streaming.runtime.core.master.scheduler.strategy.impl;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.runtime.config.types.SlotAssignStrategyType;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.ContainerID;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.core.resource.Slot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Based on Ray dynamic resource function, resource details(by ray gcs get) and
* execution logic diagram, PipelineFirstStrategy provides a actor scheduling
* strategies to make the cluster load balanced and controllable scheduling.
* Assume that we have 2 containers and have a DAG graph composed of a source node with parallelism
* of 2 and a sink node with parallelism of 2, the structure will be like:
* <pre>
* container_0
* |- source_1
* |- sink_1
* container_1
* |- source_2
* |- sink_2
* </pre>
*/
public class PipelineFirstStrategy implements SlotAssignStrategy {
public static final Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategy.class);
protected Resources resources;
@Override
public int getSlotNumPerContainer(List<Container> containers, int maxParallelism) {
LOG.info("max parallelism: {}, container size: {}.", maxParallelism, containers.size());
int slotNumPerContainer =
(int) Math.ceil(Math.max(maxParallelism, containers.size()) * 1.0 / containers.size());
LOG.info("slot num per container: {}.", slotNumPerContainer);
return slotNumPerContainer;
}
/**
* Allocate slot to target container, assume that we have 2 containers and max parallelism is 5,
* the structure will be like:
* <pre>
* container_0
* |- slot_0
* |- slot_2
* |- slot_4
* container_1
* |- slot_1
* |- slot_3
* |- slot_5
* </pre>
*/
@Override
public void allocateSlot(List<Container> containers,
int slotNumPerContainer) {
int maxSlotSize = containers.size() * slotNumPerContainer;
LOG.info("Allocate slot, maxSlotSize: {}.", maxSlotSize);
for (int slotId = 0; slotId < maxSlotSize; ++slotId) {
Container targetContainer = containers.get(slotId % containers.size());
Slot slot = new Slot(slotId, targetContainer.getContainerId());
targetContainer.getSlots().add(slot);
}
// update new added containers' allocating map
containers.forEach(c -> {
List<Slot> slots = c.getSlots();
resources.getAllocatingMap().put(c.getContainerId(), slots);
});
LOG.info("Allocate slot result: {}.", resources.getAllocatingMap());
}
@Override
public Map<ContainerID, List<Slot>> assignSlot(ExecutionGraph executionGraph) {
LOG.info("Container available resources: {}.", resources.getAllAvailableResource());
Map<Integer, ExecutionJobVertex> vertices = executionGraph.getExecutionJobVertexMap();
Map<Integer, Integer> vertexRemainingNum = new HashMap<>();
vertices.forEach((k, v) -> {
int size = v.getExecutionVertices().size();
vertexRemainingNum.put(k, size);
});
int totalExecutionVerticesNum = vertexRemainingNum.values().stream()
.mapToInt(Integer::intValue)
.sum();
int containerNum = resources.getRegisterContainers().size();
resources.setActorPerContainer((int) Math
.ceil(totalExecutionVerticesNum * 1.0 / containerNum));
LOG.info("Total execution vertices num: {}, container num: {}, capacity per container: {}.",
totalExecutionVerticesNum, containerNum, resources.getActorPerContainer());
int maxParallelism = executionGraph.getMaxParallelism();
for (int i = 0; i < maxParallelism; i++) {
for (ExecutionJobVertex executionJobVertex : vertices.values()) {
List<ExecutionVertex> exeVertices = executionJobVertex.getExecutionVertices();
// current job vertex assign finished
if (exeVertices.size() <= i) {
continue;
}
ExecutionVertex executionVertex = exeVertices.get(i);
//check current container has enough resources.
checkResource(executionVertex.getResources());
Container targetContainer = resources.getRegisterContainers()
.get(resources.getCurrentContainerIndex());
List<Slot> targetSlots = targetContainer.getSlots();
allocate(executionVertex, targetContainer, targetSlots.get(i % targetSlots.size()));
}
}
return resources.getAllocatingMap();
}
private void checkResource(Map<String, Double> requiredResource) {
int checkedNum = 0;
// if current container does not have enough resource, go to the next one (loop)
while (!hasEnoughResource(requiredResource)) {
checkedNum++;
resources.setCurrentContainerIndex((resources.getCurrentContainerIndex() + 1) %
resources.getRegisterContainers().size());
Preconditions.checkArgument(checkedNum < resources.getRegisterContainers().size(),
"No enough resource left, required resource: {}, available resource: {}.",
requiredResource, resources.getAllAvailableResource());
resources.setCurrentContainerAllocatedActorNum(0);
}
}
private boolean hasEnoughResource(Map<String, Double> requiredResource) {
LOG.info("Check resource for container, index: {}.", resources.getCurrentContainerIndex());
if (null == requiredResource) {
return true;
}
Container currentContainer = resources.getRegisterContainers()
.get(resources.getCurrentContainerIndex());
List<Slot> slotActors = resources.getAllocatingMap().get(currentContainer.getContainerId());
if (slotActors != null && slotActors.size() > 0) {
long allocatedActorNum = slotActors.stream()
.map(Slot::getExecutionVertexIds)
.mapToLong(List::size)
.sum();
if (allocatedActorNum >= resources.getActorPerContainer()) {
LOG.info("Container remaining capacity is 0. used: {}, total: {}.", allocatedActorNum,
resources.getActorPerContainer());
return false;
}
}
Map<String, Double> availableResource = currentContainer.getAvailableResource();
for (Map.Entry<String, Double> entry : requiredResource.entrySet()) {
if (availableResource.containsKey(entry.getKey())) {
if (availableResource.get(entry.getKey()) < entry.getValue()) {
LOG.warn("No enough resource for container {}. required: {}, available: {}.",
currentContainer.getAddress(), requiredResource, availableResource);
return false;
}
}
}
return true;
}
private void allocate(ExecutionVertex vertex, Container container, Slot slot) {
// set slot for execution vertex
LOG.info("Set slot {} to vertex {}.", slot, vertex);
vertex.setSlotIfNotExist(slot);
Slot useSlot = resources.getAllocatingMap().get(container.getContainerId())
.stream().filter(s -> s.getId() == slot.getId()).findFirst().get();
useSlot.getExecutionVertexIds().add(vertex.getVertexId());
// current container reaches capacity limitation, go to the next one.
resources.setCurrentContainerAllocatedActorNum(
resources.getCurrentContainerAllocatedActorNum() + 1);
if (resources.getCurrentContainerAllocatedActorNum() >= resources.getActorPerContainer()) {
resources.setCurrentContainerIndex(
(resources.getCurrentContainerIndex() + 1) % resources.getRegisterContainers().size());
resources.setCurrentContainerAllocatedActorNum(0);
}
}
@Override
public String getName() {
return SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY.getValue();
}
@Override
public void setResources(Resources resources) {
this.resources = resources;
}
}

View file

@ -0,0 +1,86 @@
package org.ray.streaming.runtime.core.resource;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
/**
* Resource manager unit abstraction.
* Container identifies the available resource(cpu,mem) and allocated slots.
*/
public class Container implements Serializable {
private ContainerID containerId;
private UniqueId nodeId;
private String address;
private String hostname;
private Map<String, Double> availableResource = new HashMap<>();
private List<Slot> slots = new ArrayList<>();
public Container() {
}
public Container(UniqueId nodeId, String address, String hostname) {
this.containerId = new ContainerID();
this.nodeId = nodeId;
this.address = address;
this.hostname = hostname;
}
public void setContainerId(ContainerID containerId) {
this.containerId = containerId;
}
public ContainerID getContainerId() {
return containerId;
}
public String getName() {
return containerId.toString();
}
public String getAddress() {
return address;
}
public UniqueId getNodeId() {
return nodeId;
}
public String getHostname() {
return hostname;
}
public Map<String, Double> getAvailableResource() {
return availableResource;
}
public void setAvailableResource(Map<String, Double> availableResource) {
this.availableResource = availableResource;
}
public List<Slot> getSlots() {
return slots;
}
public void setSlots(List<Slot> slots) {
this.slots = slots;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("containerId", containerId)
.add("nodeId", nodeId)
.add("address", address)
.add("hostname", hostname)
.add("availableResource", availableResource)
.add("slots", slots)
.toString();
}
}

View file

@ -0,0 +1,9 @@
package org.ray.streaming.runtime.core.resource;
import org.ray.streaming.runtime.core.common.AbstractID;
/**
* Container unique identifier.
*/
public class ContainerID extends AbstractID {
}

View file

@ -0,0 +1,134 @@
package org.ray.streaming.runtime.core.resource;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.api.id.UniqueId;
import org.ray.streaming.runtime.config.master.ResourceConfig;
/**
* Resource description of ResourceManager.
*/
public class Resources implements Serializable {
/**
* Available containers registered to ResourceManager.
*/
private List<Container> registerContainers = new ArrayList<>();
/**
* Mapping of allocated container to slots.
*/
private Map<ContainerID, List<Slot>> allocatingMap = new HashMap<>(16);
/**
* Number of slots per container.
*/
private int slotNumPerContainer = 0;
/**
* Number of actors per container.
*/
private int actorPerContainer = 0;
/**
* Number of actors that the current container has allocated.
*/
private int currentContainerAllocatedActorNum = 0;
/**
* The container index currently being allocated.
*/
private int currentContainerIndex = 0;
private int maxActorNumPerContainer;
public Resources(ResourceConfig resourceConfig) {
maxActorNumPerContainer = resourceConfig.maxActorNumPerContainer();
}
public List<Container> getRegisterContainers() {
return registerContainers;
}
public void setSlotNumPerContainer(int slotNumPerContainer) {
this.slotNumPerContainer = slotNumPerContainer;
}
public int getSlotNumPerContainer() {
return slotNumPerContainer;
}
public void setRegisterContainers(
List<Container> registerContainers) {
this.registerContainers = registerContainers;
}
public void setCurrentContainerIndex(int currentContainerIndex) {
this.currentContainerIndex = currentContainerIndex;
}
public int getCurrentContainerIndex() {
return currentContainerIndex;
}
public void setCurrentContainerAllocatedActorNum(int currentContainerAllocatedActorNum) {
this.currentContainerAllocatedActorNum = currentContainerAllocatedActorNum;
}
public int getCurrentContainerAllocatedActorNum() {
return currentContainerAllocatedActorNum;
}
public int getActorPerContainer() {
return actorPerContainer;
}
public void setActorPerContainer(int actorPerContainer) {
this.actorPerContainer = actorPerContainer;
}
public Container getRegisterContainerByContainerId(ContainerID containerID) {
return registerContainers.stream()
.filter(container -> container.getContainerId().equals(containerID))
.findFirst().get();
}
public int getMaxActorNumPerContainer() {
return maxActorNumPerContainer;
}
public Map<ContainerID, List<Slot>> getAllocatingMap() {
return allocatingMap;
}
public void setAllocatingMap(
Map<ContainerID, List<Slot>> allocatingMap) {
this.allocatingMap = allocatingMap;
}
public Map<UniqueId, Map<String, Double>> getAllAvailableResource() {
Map<UniqueId, Map<String, Double>> availableResource = new HashMap<>();
for (Container container : registerContainers) {
availableResource.put(container.getNodeId(), container.getAvailableResource());
}
return availableResource;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("registerContainers", registerContainers)
.add("allocatingMap", allocatingMap)
.add("slotNumPerContainer", slotNumPerContainer)
.add("actorPerContainer", actorPerContainer)
.add("currentContainerAllocatedActorNum", currentContainerAllocatedActorNum)
.add("currentContainerIndex", currentContainerIndex)
.add("maxActorNumPerContainer", maxActorNumPerContainer)
.toString();
}
}

View file

@ -0,0 +1,54 @@
package org.ray.streaming.runtime.core.resource;
import com.google.common.base.MoreObjects;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class Slot implements Serializable {
private int id;
/**
* The slot belongs to a container.
*/
private ContainerID containerID;
private AtomicInteger actorCount = new AtomicInteger(0);
/**
* List of ExecutionVertex ids belong to the slot.
*/
private List<Integer> executionVertexIds = new ArrayList<>();
public Slot(int id, ContainerID containerID) {
this.id = id;
this.containerID = containerID;
}
public int getId() {
return id;
}
public ContainerID getContainerID() {
return containerID;
}
public AtomicInteger getActorCount() {
return actorCount;
}
public List<Integer> getExecutionVertexIds() {
return executionVertexIds;
}
public void setExecutionVertexIds(List<Integer> executionVertexIds) {
this.executionVertexIds = executionVertexIds;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("containerID", containerID)
.add("actorCount", actorCount)
.toString();
}
}

View file

@ -50,7 +50,8 @@ public class GraphManagerImpl implements GraphManager {
long buildTime = executionGraph.getBuildTime();
for (JobVertex jobVertex : jobGraph.getJobVertexList()) {
int jobVertexId = jobVertex.getVertexId();
exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex));
exeJobVertexMap.put(jobVertexId,
new ExecutionJobVertex(jobVertex, runtimeContext));
}
// connect vertex

View file

@ -1,7 +1,13 @@
package org.ray.streaming.runtime.graph;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.DataStreamSource;
@ -9,6 +15,7 @@ import org.ray.streaming.api.stream.StreamSink;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobGraphBuilder;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
@ -26,7 +33,9 @@ public class ExecutionGraphTest extends BaseUnitTest {
@Test
public void testBuildExecutionGraph() {
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(null));
Map<String, String> jobConf = new HashMap<>();
StreamingConfig streamingConfig = new StreamingConfig(jobConf);
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig));
JobGraph jobGraph = buildJobGraph();
ExecutionGraph executionGraph = buildExecutionGraph(graphManager, jobGraph);
List<ExecutionJobVertex> executionJobVertices = executionGraph.getExecutionJobVertexLices();

View file

@ -0,0 +1,99 @@
package org.ray.streaming.runtime.resourcemanager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.aeonbits.owner.util.Collections;
import org.ray.api.Ray;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.TestHelper;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.config.global.CommonConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.master.resourcemanager.ResourceManager;
import org.ray.streaming.runtime.core.master.resourcemanager.ResourceManagerImpl;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.ContainerID;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.graph.ExecutionGraphTest;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.master.graphmanager.GraphManager;
import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ResourceManagerTest extends BaseUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerTest.class);
@org.testng.annotations.BeforeClass
public void setUp() {
// ray init
Ray.init();
TestHelper.setUTFlag();
}
@org.testng.annotations.AfterClass
public void tearDown() {
TestHelper.clearUTFlag();
}
@Test
public void testApi() {
Map<String, String> conf = new HashMap<String, String>();
conf.put(CommonConfig.JOB_NAME, "testApi");
conf.put(ResourceConfig.TASK_RESOURCE_CPU_LIMIT_ENABLE, "true");
conf.put(ResourceConfig.TASK_RESOURCE_MEM_LIMIT_ENABLE, "true");
conf.put(ResourceConfig.TASK_RESOURCE_MEM, "10");
conf.put(ResourceConfig.TASK_RESOURCE_CPU, "2");
StreamingConfig config = new StreamingConfig(conf);
JobRuntimeContext jobRuntimeContext = new JobRuntimeContext(config);
ResourceManager resourceManager = new ResourceManagerImpl(jobRuntimeContext);
SlotAssignStrategy slotAssignStrategy = resourceManager.getSlotAssignStrategy();
Assert.assertTrue(slotAssignStrategy instanceof PipelineFirstStrategy);
Map<String, Double> containerResource = new HashMap<>();
containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0);
containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0);
Container container1 = new Container(null, "testAddress1", "testHostName1");
container1.setAvailableResource(containerResource);
Container container2 = new Container(null, "testAddress2", "testHostName2");
container2.setAvailableResource(new HashMap<>(containerResource));
List<Container> containers = Collections.list(container1, container2);
resourceManager.getResources().getRegisterContainers().addAll(containers);
Assert.assertEquals(resourceManager.getRegisteredContainers().size(), 2);
//build ExecutionGraph
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(config));
JobGraph jobGraph = ExecutionGraphTest.buildJobGraph();
ExecutionGraph executionGraph = ExecutionGraphTest.buildExecutionGraph(graphManager, jobGraph);
int slotNumPerContainer = slotAssignStrategy.getSlotNumPerContainer(containers, executionGraph
.getMaxParallelism());
Assert.assertEquals(slotNumPerContainer, 1);
slotAssignStrategy.allocateSlot(containers, slotNumPerContainer);
Map<ContainerID, List<Slot>> allocatingMap = slotAssignStrategy.assignSlot(executionGraph);
Assert.assertEquals(allocatingMap.size(), 2);
executionGraph.getAllAddedExecutionVertices().forEach(vertex -> {
Container container = resourceManager.getResources()
.getRegisterContainerByContainerId(vertex.getSlot().getContainerID());
Map<String, Double> resource = resourceManager.allocateResource(container, vertex.getResources());
Assert.assertNotNull(resource);
});
Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0);
Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0);
Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0);
}
}

View file

@ -0,0 +1,108 @@
package org.ray.streaming.runtime.schedule.strategy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.collect.Lists;
import org.aeonbits.owner.ConfigFactory;
import org.ray.api.RayActor;
import org.ray.api.id.ActorId;
import org.ray.api.id.ObjectId;
import org.ray.api.id.UniqueId;
import org.ray.runtime.actor.LocalModeRayActor;
import org.ray.streaming.api.context.RuntimeContext;
import org.ray.streaming.api.context.StreamingContext;
import org.ray.streaming.api.stream.DataStream;
import org.ray.streaming.api.stream.DataStreamSink;
import org.ray.streaming.api.stream.DataStreamSource;
import org.ray.streaming.jobgraph.JobGraph;
import org.ray.streaming.jobgraph.JobGraphBuilder;
import org.ray.streaming.runtime.BaseUnitTest;
import org.ray.streaming.runtime.config.StreamingConfig;
import org.ray.streaming.runtime.config.StreamingMasterConfig;
import org.ray.streaming.runtime.config.master.ResourceConfig;
import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy;
import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy;
import org.ray.streaming.runtime.core.resource.Container;
import org.ray.streaming.runtime.core.resource.ContainerID;
import org.ray.streaming.runtime.core.resource.Resources;
import org.ray.streaming.runtime.core.resource.Slot;
import org.ray.streaming.runtime.graph.ExecutionGraphTest;
import org.ray.streaming.runtime.master.JobRuntimeContext;
import org.ray.streaming.runtime.master.graphmanager.GraphManager;
import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class PipelineFirstStrategyTest extends BaseUnitTest {
private Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategyTest.class);
private SlotAssignStrategy strategy;
private List<Container> containers = new ArrayList<>();
private JobGraph jobGraph;
private ExecutionGraph executionGraph;
private int maxParallelism;
@BeforeClass
public void setUp() {
strategy = new PipelineFirstStrategy();
Map<String, String> conf = new HashMap<>();
ResourceConfig resourceConfig = ConfigFactory.create(ResourceConfig.class, conf);
Resources resources = new Resources(resourceConfig);
Map<String, Double> containerResource = new HashMap<>();
containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0);
containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0);
for (int i = 0; i < 2; ++i) {
UniqueId uniqueId = UniqueId.randomId();
Container container = new Container(uniqueId, "1.1.1." + i, "localhost" + i);
container.setAvailableResource(containerResource);
containers.add(container);
resources.getRegisterContainers().add(container);
}
strategy.setResources(resources);
//build ExecutionGraph
Map<String, String> jobConf = new HashMap<>();
StreamingConfig streamingConfig = new StreamingConfig(jobConf);
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig));
jobGraph = ExecutionGraphTest.buildJobGraph();
executionGraph = ExecutionGraphTest.buildExecutionGraph(graphManager, jobGraph);
maxParallelism = executionGraph.getMaxParallelism();
}
@Test
public int testSlotNumPerContainer() {
int slotNumPerContainer = strategy.getSlotNumPerContainer(containers, maxParallelism);
Assert.assertEquals(slotNumPerContainer,
(int) Math.ceil(Math.max(maxParallelism, containers.size()) * 1.0 / containers.size()));
return slotNumPerContainer;
}
@Test
public void testAllocateSlot() {
int slotNumPerContainer = testSlotNumPerContainer();
strategy.allocateSlot(containers, slotNumPerContainer);
for (Container container : containers) {
Assert.assertEquals(container.getSlots().size(), slotNumPerContainer);
}
}
@Test
public void testAssignSlot() {
Map<ContainerID, List<Slot>> allocatingMap = strategy.assignSlot(executionGraph);
for (Entry<ContainerID, List<Slot>> containerSlotEntry : allocatingMap.entrySet()) {
containerSlotEntry.getValue()
.forEach(slot -> Assert.assertNotEquals(slot.getExecutionVertexIds().size(), 0));
}
}
}