From 7e115490d963344ec5e20cc0bc16907835f6b67f Mon Sep 17 00:00:00 2001 From: JianZhangYang Date: Mon, 24 Feb 2020 19:32:50 +0800 Subject: [PATCH] Streaming scheduler resourcemanager (#7070) --- .../runtime/config/StreamingMasterConfig.java | 5 + .../runtime/config/master/ResourceConfig.java | 92 ++++++++ .../config/types/SlotAssignStrategyType.java | 21 ++ .../runtime/core/common/AbstractID.java | 123 +++++++++++ .../graph/executiongraph/ExecutionEdge.java | 1 - .../executiongraph/ExecutionJobVertex.java | 10 +- .../graph/executiongraph/ExecutionVertex.java | 45 +++- .../resourcemanager/ResourceManager.java | 53 +++++ .../resourcemanager/ResourceManagerImpl.java | 200 +++++++++++++++++ .../strategy/SlotAssignStrategy.java | 42 ++++ .../strategy/SlotAssignStrategyFactory.java | 24 ++ .../strategy/impl/PipelineFirstStrategy.java | 205 ++++++++++++++++++ .../runtime/core/resource/Container.java | 86 ++++++++ .../runtime/core/resource/ContainerID.java | 9 + .../runtime/core/resource/Resources.java | 134 ++++++++++++ .../streaming/runtime/core/resource/Slot.java | 54 +++++ .../master/graphmanager/GraphManagerImpl.java | 3 +- .../runtime/graph/ExecutionGraphTest.java | 11 +- .../resourcemanager/ResourceManagerTest.java | 99 +++++++++ .../strategy/PipelineFirstStrategyTest.java | 108 +++++++++ 20 files changed, 1319 insertions(+), 6 deletions(-) create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/master/ResourceConfig.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/types/SlotAssignStrategyType.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/common/AbstractID.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManager.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManagerImpl.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategy.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategyFactory.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/impl/PipelineFirstStrategy.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Container.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/ContainerID.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Resources.java create mode 100644 streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Slot.java create mode 100644 streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java create mode 100644 streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java index 2d6c6629a..907b46ba3 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/StreamingMasterConfig.java @@ -1,6 +1,8 @@ package org.ray.streaming.runtime.config; import java.util.Map; +import org.aeonbits.owner.ConfigFactory; +import org.ray.streaming.runtime.config.master.ResourceConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +13,10 @@ public class StreamingMasterConfig extends StreamingGlobalConfig { private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class); + public ResourceConfig resourceConfig; + public StreamingMasterConfig(final Map conf) { super(conf); + this.resourceConfig = ConfigFactory.create(ResourceConfig.class, conf); } } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/master/ResourceConfig.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/master/ResourceConfig.java new file mode 100644 index 000000000..9764fcec4 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/master/ResourceConfig.java @@ -0,0 +1,92 @@ +package org.ray.streaming.runtime.config.master; + +import org.ray.streaming.runtime.config.Config; + +/** + * Job resource management config. + */ +public interface ResourceConfig extends Config { + + /** + * CPU definition key of resource management. + */ + String RESOURCE_KEY_CPU = "CPU"; + + /** + * Memory definition key of resource management. + */ + String RESOURCE_KEY_MEM = "MEM"; + + /** + * Number of actors per container. + */ + String MAX_ACTOR_NUM_PER_CONTAINER = "streaming.container.per.max.actor"; + + /** + * The interval between detecting ray cluster nodes. + */ + String CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND = "streaming.resource.check.interval.second"; + + /** + * CPU use by per task. + */ + String TASK_RESOURCE_CPU = "streaming.task.resource.cpu"; + + /** + * Memory use by each task + */ + String TASK_RESOURCE_MEM = "streaming.task.resource.mem"; + + /** + * Whether to enable CPU limit in resource control. + */ + String TASK_RESOURCE_CPU_LIMIT_ENABLE = "streaming.task.resource.cpu.limitation.enable"; + + /** + * Whether to enable memory limit in resource control. + */ + String TASK_RESOURCE_MEM_LIMIT_ENABLE = "streaming.task.resource.mem.limitation.enable"; + + /** + * Number of cpu per task. + */ + @DefaultValue(value = "1.0") + @Key(value = TASK_RESOURCE_CPU) + double taskCpuResource(); + + /** + * Memory size used by each task. + */ + @DefaultValue(value = "2.0") + @Key(value = TASK_RESOURCE_MEM) + double taskMemResource(); + + /** + * Whether to enable CPU limit in resource control. + */ + @DefaultValue(value = "true") + @Key(value = TASK_RESOURCE_CPU_LIMIT_ENABLE) + boolean isTaskCpuResourceLimit(); + + /** + * Whether to enable memory limit in resource control. + */ + @DefaultValue(value = "true") + @Key(value = TASK_RESOURCE_MEM_LIMIT_ENABLE) + boolean isTaskMemResourceLimit(); + + /** + * Number of actors per container. + */ + @DefaultValue(value = "500") + @Key(MAX_ACTOR_NUM_PER_CONTAINER) + int maxActorNumPerContainer(); + + /** + * The interval between detecting ray cluster nodes. + */ + @DefaultValue(value = "1") + @Key(value = CONTAINER_RESOURCE_CHECk_INTERVAL_SECOND) + long resourceCheckIntervalSecond(); + +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/types/SlotAssignStrategyType.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/types/SlotAssignStrategyType.java new file mode 100644 index 000000000..073fb8d27 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/config/types/SlotAssignStrategyType.java @@ -0,0 +1,21 @@ +package org.ray.streaming.runtime.config.types; + +public enum SlotAssignStrategyType { + + /** + * Resource scheduling strategy based on FF(First Fit) algorithm and pipeline. + */ + PIPELINE_FIRST_STRATEGY("pipeline_first_strategy", 0); + + private String value; + private int index; + + SlotAssignStrategyType(String value, int index) { + this.value = value; + this.index = index; + } + + public String getValue() { + return value; + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/common/AbstractID.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/common/AbstractID.java new file mode 100644 index 000000000..67361cae1 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/common/AbstractID.java @@ -0,0 +1,123 @@ +package org.ray.streaming.runtime.core.common; + +import java.io.Serializable; +import java.util.Random; + +/** + * Streaming system unique identity base class. + * For example, ${@link org.ray.streaming.runtime.core.resource.ContainerID } + */ +public class AbstractID implements Comparable, Serializable { + + private static final long serialVersionUID = 1L; + private static final Random RANDOM = new Random(); + private static final int SIZE_OF_LONG = 8; + private static final int SIZE_OF_UPPER_PART = 8; + private static final int SIZE_OF_LOWER_PART = 8; + + //lowerPart(long type) + upperPart(long type) + public static final int SIZE = SIZE_OF_UPPER_PART + SIZE_OF_LOWER_PART; + + protected final long upperPart; + protected final long lowerPart; + + private String toString; + + public AbstractID(byte[] bytes) { + if (bytes == null || bytes.length != SIZE) { + throw new IllegalArgumentException("Argument bytes must by an array of " + SIZE + " bytes"); + } + + this.lowerPart = byteArrayToLong(bytes, 0); + this.upperPart = byteArrayToLong(bytes, SIZE_OF_LONG); + } + + public AbstractID(long lowerPart, long upperPart) { + this.lowerPart = lowerPart; + this.upperPart = upperPart; + } + + public AbstractID(AbstractID id) { + if (id == null) { + throw new IllegalArgumentException("Id must not be null."); + } + this.lowerPart = id.lowerPart; + this.upperPart = id.upperPart; + } + + public AbstractID() { + this.lowerPart = RANDOM.nextLong(); + this.upperPart = RANDOM.nextLong(); + } + + public long getLowerPart() { + return lowerPart; + } + + public long getUpperPart() { + return upperPart; + } + + public byte[] getBytes() { + byte[] bytes = new byte[SIZE]; + longToByteArray(lowerPart, bytes, 0); + longToByteArray(upperPart, bytes, SIZE_OF_LONG); + return bytes; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj != null && obj.getClass() == getClass()) { + AbstractID that = (AbstractID) obj; + return that.lowerPart == this.lowerPart && that.upperPart == this.upperPart; + } else { + return false; + } + } + + @Override + public int hashCode() { + return ((int) this.lowerPart) ^ + ((int) (this.lowerPart >>> 32)) ^ + ((int) this.upperPart) ^ + ((int) (this.upperPart >>> 32)); + } + + @Override + public String toString() { + if (this.toString == null) { + final byte[] ba = new byte[SIZE]; + longToByteArray(this.lowerPart, ba, 0); + longToByteArray(this.upperPart, ba, SIZE_OF_LONG); + this.toString = new String(ba); + } + + return this.toString; + } + + @Override + public int compareTo(AbstractID abstractID) { + int diff1 = Long.compare(this.upperPart, abstractID.upperPart); + int diff2 = Long.compare(this.lowerPart, abstractID.lowerPart); + return diff1 == 0 ? diff2 : diff1; + } + + private static long byteArrayToLong(byte[] begin, int offset) { + long longNum = 0; + + for (int i = 0; i < SIZE_OF_LONG; ++i) { + longNum |= (begin[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3); + } + + return longNum; + } + + private static void longToByteArray(long longNum, byte[] byteArray, int offset) { + for (int i = 0; i < SIZE_OF_LONG; ++i) { + final int shift = i << 3; // i * 8 + byteArray[offset + SIZE_OF_LONG - 1 - i] = (byte) ((longNum & (0xffL << shift)) >>> shift); + } + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java index c18a4e2a5..46e5d2814 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionEdge.java @@ -58,7 +58,6 @@ public class ExecutionEdge implements Serializable { public String toString() { return MoreObjects.toStringHelper(this) .add("srcVertex", sourceVertex) - .add("targetVertex", targetVertex) .add("executionEdgeIndex", executionEdgeIndex) .toString(); } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java index 900b55307..466f863e6 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionJobVertex.java @@ -10,6 +10,7 @@ import org.ray.api.RayActor; import org.ray.streaming.jobgraph.JobVertex; import org.ray.streaming.jobgraph.VertexType; import org.ray.streaming.operator.StreamOperator; +import org.ray.streaming.runtime.master.JobRuntimeContext; import org.ray.streaming.runtime.worker.JobWorker; /** @@ -34,15 +35,18 @@ public class ExecutionJobVertex { private int parallelism; private List executionVertices; + private JobRuntimeContext runtimeContext; + private List inputEdges = new ArrayList<>(); private List outputEdges = new ArrayList<>(); - public ExecutionJobVertex(JobVertex jobVertex) { + public ExecutionJobVertex(JobVertex jobVertex, JobRuntimeContext runtimeContext) { this.jobVertexId = jobVertex.getVertexId(); this.jobVertexName = generateVertexName(jobVertexId, jobVertex.getStreamOperator()); this.streamOperator = jobVertex.getStreamOperator(); this.vertexType = jobVertex.getVertexType(); this.parallelism = jobVertex.getParallelism(); + this.runtimeContext = runtimeContext; this.executionVertices = createExecutionVertics(); } @@ -133,6 +137,10 @@ public class ExecutionJobVertex { return getVertexType() == VertexType.SINK; } + public JobRuntimeContext getRuntimeContext() { + return runtimeContext; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java index 9dd7fad64..d439692f9 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/graph/executiongraph/ExecutionVertex.java @@ -3,9 +3,14 @@ package org.ray.streaming.runtime.core.graph.executiongraph; import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.ray.api.RayActor; import org.ray.api.id.ActorId; +import org.ray.streaming.runtime.config.master.ResourceConfig; +import org.ray.streaming.runtime.core.resource.Slot; +import org.ray.streaming.runtime.master.JobRuntimeContext; import org.ray.streaming.runtime.worker.JobWorker; /** @@ -27,8 +32,13 @@ public class ExecutionVertex implements Serializable { * Unique name generated by vertex name and index for execution vertex. */ private final String vertexName; + /** + * Resources used by ExecutionVertex. + */ + private final Map resources; private ExecutionVertexState state = ExecutionVertexState.TO_ADD; + private Slot slot; private RayActor workerActor; private List inputEdges = new ArrayList<>(); private List outputEdges = new ArrayList<>(); @@ -37,6 +47,7 @@ public class ExecutionVertex implements Serializable { this.vertexId = generateExecutionVertexId(jobVertexId, index); this.vertexIndex = index; this.vertexName = executionJobVertex.getJobVertexName() + "-" + vertexIndex; + this.resources = generateResources(executionJobVertex.getRuntimeContext()); } private int generateExecutionVertexId(int jobVertexId, int index) { @@ -105,16 +116,46 @@ public class ExecutionVertex implements Serializable { return vertexName; } + public Map getResources() { + return resources; + } + + public Slot getSlot() { + return slot; + } + + public void setSlot(Slot slot) { + this.slot = slot; + } + + public void setSlotIfNotExist(Slot slot) { + if (null == this.slot) { + this.slot = slot; + } + } + + private Map generateResources(JobRuntimeContext runtimeContext) { + Map resourceMap = new HashMap<>(); + ResourceConfig resourceConfig = runtimeContext.getConf().masterConfig.resourceConfig; + if (resourceConfig.isTaskCpuResourceLimit()) { + resourceMap.put(ResourceConfig.RESOURCE_KEY_CPU, resourceConfig.taskCpuResource()); + } + if (resourceConfig.isTaskMemResourceLimit()) { + resourceMap.put(ResourceConfig.RESOURCE_KEY_MEM, resourceConfig.taskMemResource()); + } + return resourceMap; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) .add("vertexId", vertexId) .add("vertexIndex", vertexIndex) .add("vertexName", vertexName) + .add("resources", resources) .add("state", state) + .add("slot", slot) .add("workerActor", workerActor) - .add("inputEdges", inputEdges) - .add("outputEdges", outputEdges) .toString(); } } diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManager.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManager.java new file mode 100644 index 000000000..6313780c6 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManager.java @@ -0,0 +1,53 @@ +package org.ray.streaming.runtime.core.master.resourcemanager; + +import java.util.List; +import java.util.Map; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.Resources; + +/** + * The resource manager is responsible for resource de-/allocation and monitoring ray cluster. + */ +public interface ResourceManager { + + /** + * Get all registered container as a list. + * + * @return A list of containers. + */ + List getRegisteredContainers(); + + /** + * Allocate resource to actor. + * + * @param container Specify the container to be allocated. + * @param requireResource Resource size to be requested. + * @return Allocated resource. + */ + Map allocateResource(final Container container, + final Map requireResource); + + /** + * Deallocate resource to actor. + * + * @param container Specify the container to be deallocate. + * @param releaseResource Resource to be released. + */ + void deallocateResource(final Container container, + final Map releaseResource); + + /** + * Get the current slot-assign strategy from manager. + * + * @return Current slot-assign strategy. + */ + SlotAssignStrategy getSlotAssignStrategy(); + + /** + * Get resources from manager. + * + * @return Current resources in manager. + */ + Resources getResources(); +} \ No newline at end of file diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManagerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManagerImpl.java new file mode 100644 index 000000000..235a6a576 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/resourcemanager/ResourceManagerImpl.java @@ -0,0 +1,200 @@ +package org.ray.streaming.runtime.core.master.resourcemanager; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.ray.api.Ray; +import org.ray.api.runtimecontext.NodeInfo; +import org.ray.streaming.runtime.config.StreamingMasterConfig; +import org.ray.streaming.runtime.config.master.ResourceConfig; +import org.ray.streaming.runtime.config.types.SlotAssignStrategyType; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategyFactory; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.Resources; +import org.ray.streaming.runtime.master.JobRuntimeContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResourceManagerImpl implements ResourceManager { + + private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerImpl.class); + + //Container used tag + private static final String CONTAINER_ENGAGED_KEY = "CONTAINER_ENGAGED_KEY"; + + /** + * Job runtime context. + */ + private JobRuntimeContext runtimeContext; + + /** + * Resource related configuration. + */ + private ResourceConfig resourceConfig; + + /** + * Slot assign strategy. + */ + private SlotAssignStrategy slotAssignStrategy; + + /** + * Resource description information. + */ + private final Resources resources; + + private final ScheduledExecutorService scheduledExecutorService; + + public ResourceManagerImpl(JobRuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + StreamingMasterConfig masterConfig = runtimeContext.getConf().masterConfig; + + this.resourceConfig = masterConfig.resourceConfig; + this.resources = new Resources(resourceConfig); + LOG.info("ResourceManagerImpl begin init, conf is {}, resources are {}.", + resourceConfig, resources); + + SlotAssignStrategyType slotAssignStrategyType = SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY; + + this.slotAssignStrategy = SlotAssignStrategyFactory.getStrategy(slotAssignStrategyType); + this.slotAssignStrategy.setResources(resources); + LOG.info("Slot assign strategy: {}.", slotAssignStrategy.getName()); + + this.scheduledExecutorService = Executors.newScheduledThreadPool(1); + long intervalSecond = resourceConfig.resourceCheckIntervalSecond(); + this.scheduledExecutorService.scheduleAtFixedRate( + Ray.wrapRunnable(this::checkAndUpdateResources), 0, intervalSecond, TimeUnit.SECONDS); + + LOG.info("ResourceManagerImpl init success."); + } + + @Override + public Map allocateResource(final Container container, + final Map requireResource) { + LOG.info("Start to allocate resource for actor with container: {}.", container); + + // allocate resource to actor + Map resources = new HashMap<>(); + Map containResource = container.getAvailableResource(); + for (Map.Entry entry : containResource.entrySet()) { + if (requireResource.containsKey(entry.getKey())) { + double availableResource = entry.getValue() - requireResource.get(entry.getKey()); + entry.setValue(availableResource); + resources.put(entry.getKey(), requireResource.get(entry.getKey())); + } + } + + LOG.info("Allocate resource: {} to container {}.", requireResource, container); + return resources; + } + + @Override + public void deallocateResource(final Container container, + final Map releaseResource) { + LOG.info("Deallocating resource for container {}.", container); + + Map containResource = container.getAvailableResource(); + for (Map.Entry entry : containResource.entrySet()) { + if (releaseResource.containsKey(entry.getKey())) { + double availableResource = entry.getValue() + releaseResource.get(entry.getKey()); + LOG.info("Release source {}:{}", entry.getKey(), releaseResource.get(entry.getKey())); + entry.setValue(availableResource); + } + } + + LOG.info("Deallocated resource for container {} success.", container); + } + + @Override + public List getRegisteredContainers() { + return new ArrayList<>(resources.getRegisterContainers()); + } + + @Override + public SlotAssignStrategy getSlotAssignStrategy() { + return slotAssignStrategy; + } + + @Override + public Resources getResources() { + return this.resources; + } + + /** + * Check the status of ray cluster node and update the internal resource information of + * streaming system. + */ + private void checkAndUpdateResources() { + // get all started nodes + List latestNodeInfos = Ray.getRuntimeContext().getAllNodeInfo(); + + List addNodes = latestNodeInfos.stream().filter(nodeInfo -> { + for (Container container : resources.getRegisterContainers()) { + if (container.getNodeId().equals(nodeInfo.nodeId)) { + return false; + } + } + return true; + }).collect(Collectors.toList()); + + List deleteContainers = resources.getRegisterContainers().stream() + .filter(container -> { + for (NodeInfo nodeInfo : latestNodeInfos) { + if (nodeInfo.nodeId.equals(container.getNodeId())) { + return false; + } + } + return true; + }).collect(Collectors.toList()); + LOG.info("Latest node infos: {}, current containers: {}, add nodes: {}, delete nodes: {}.", + latestNodeInfos, resources.getRegisterContainers(), addNodes, deleteContainers); + + //Register new nodes. + if (!addNodes.isEmpty()) { + for (NodeInfo node : addNodes) { + registerContainer(node); + } + } + //Clear deleted nodes + if (!deleteContainers.isEmpty()) { + for (Container container : deleteContainers) { + unregisterContainer(container); + } + } + } + + private void registerContainer(final NodeInfo nodeInfo) { + LOG.info("Register container {}.", nodeInfo); + + Container container = + new Container(nodeInfo.nodeId, nodeInfo.nodeAddress, nodeInfo.nodeHostname); + container.setAvailableResource(nodeInfo.resources); + + //Create ray resource. + Ray.setResource(container.getNodeId(), + container.getName(), + resources.getMaxActorNumPerContainer()); + //Mark container is already registered. + Ray.setResource(container.getNodeId(), + CONTAINER_ENGAGED_KEY, 1); + + // update register container list + resources.getRegisterContainers().add(container); + } + + private void unregisterContainer(final Container container) { + LOG.info("Unregister container {}.", container); + + // delete resource with capacity to 0 + Ray.setResource(container.getNodeId(), container.getName(), 0); + Ray.setResource(container.getNodeId(), CONTAINER_ENGAGED_KEY, 0); + + // remove from container map + resources.getRegisterContainers().remove(container); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategy.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategy.java new file mode 100644 index 000000000..2a80a62fa --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategy.java @@ -0,0 +1,42 @@ +package org.ray.streaming.runtime.core.master.scheduler.strategy; + +import java.util.List; +import java.util.Map; + +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.ContainerID; +import org.ray.streaming.runtime.core.resource.Resources; +import org.ray.streaming.runtime.core.resource.Slot; + +/** + * The SlotAssignStrategy managers a set of slots. When a container is + * registered to ResourceManager, slots are assigned to it. + */ +public interface SlotAssignStrategy { + + /** + * Calculate slot number per container and set to resources. + */ + int getSlotNumPerContainer(List containers, int maxParallelism); + + /** + * Allocate slot to container + */ + void allocateSlot(final List containers, final int slotNumPerContainer); + + /** + * Assign slot to execution vertex + */ + Map> assignSlot(ExecutionGraph executionGraph); + + /** + * Get slot assign strategy name + */ + String getName(); + + /** + * Set resources. + */ + void setResources(Resources resources); +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategyFactory.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategyFactory.java new file mode 100644 index 000000000..4dc19b9a0 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/SlotAssignStrategyFactory.java @@ -0,0 +1,24 @@ +package org.ray.streaming.runtime.core.master.scheduler.strategy; + +import org.ray.streaming.runtime.config.types.SlotAssignStrategyType; +import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SlotAssignStrategyFactory { + + private static final Logger LOG = LoggerFactory.getLogger(SlotAssignStrategyFactory.class); + + public static SlotAssignStrategy getStrategy(final SlotAssignStrategyType type) { + SlotAssignStrategy strategy = null; + LOG.info("Slot assign strategy is: {}.", type); + switch (type) { + case PIPELINE_FIRST_STRATEGY: + strategy = new PipelineFirstStrategy(); + break; + default: + throw new RuntimeException("strategy config error, no impl found for " + strategy); + } + return strategy; + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/impl/PipelineFirstStrategy.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/impl/PipelineFirstStrategy.java new file mode 100644 index 000000000..ee99be986 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/master/scheduler/strategy/impl/PipelineFirstStrategy.java @@ -0,0 +1,205 @@ +package org.ray.streaming.runtime.core.master.scheduler.strategy.impl; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.streaming.runtime.config.types.SlotAssignStrategyType; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.ContainerID; +import org.ray.streaming.runtime.core.resource.Resources; +import org.ray.streaming.runtime.core.resource.Slot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Based on Ray dynamic resource function, resource details(by ray gcs get) and + * execution logic diagram, PipelineFirstStrategy provides a actor scheduling + * strategies to make the cluster load balanced and controllable scheduling. + * Assume that we have 2 containers and have a DAG graph composed of a source node with parallelism + * of 2 and a sink node with parallelism of 2, the structure will be like: + *
+ *   container_0
+ *             |- source_1
+ *             |- sink_1
+ *   container_1
+ *             |- source_2
+ *             |- sink_2
+ * 
+ */ +public class PipelineFirstStrategy implements SlotAssignStrategy { + + public static final Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategy.class); + + protected Resources resources; + + @Override + public int getSlotNumPerContainer(List containers, int maxParallelism) { + LOG.info("max parallelism: {}, container size: {}.", maxParallelism, containers.size()); + int slotNumPerContainer = + (int) Math.ceil(Math.max(maxParallelism, containers.size()) * 1.0 / containers.size()); + LOG.info("slot num per container: {}.", slotNumPerContainer); + return slotNumPerContainer; + } + + /** + * Allocate slot to target container, assume that we have 2 containers and max parallelism is 5, + * the structure will be like: + *
+   * container_0
+   *           |- slot_0
+   *           |- slot_2
+   *           |- slot_4
+   * container_1
+   *           |- slot_1
+   *           |- slot_3
+   *           |- slot_5
+   * 
+ */ + @Override + public void allocateSlot(List containers, + int slotNumPerContainer) { + int maxSlotSize = containers.size() * slotNumPerContainer; + LOG.info("Allocate slot, maxSlotSize: {}.", maxSlotSize); + + for (int slotId = 0; slotId < maxSlotSize; ++slotId) { + Container targetContainer = containers.get(slotId % containers.size()); + Slot slot = new Slot(slotId, targetContainer.getContainerId()); + targetContainer.getSlots().add(slot); + } + + // update new added containers' allocating map + containers.forEach(c -> { + List slots = c.getSlots(); + resources.getAllocatingMap().put(c.getContainerId(), slots); + }); + + LOG.info("Allocate slot result: {}.", resources.getAllocatingMap()); + } + + @Override + public Map> assignSlot(ExecutionGraph executionGraph) { + LOG.info("Container available resources: {}.", resources.getAllAvailableResource()); + Map vertices = executionGraph.getExecutionJobVertexMap(); + Map vertexRemainingNum = new HashMap<>(); + vertices.forEach((k, v) -> { + int size = v.getExecutionVertices().size(); + vertexRemainingNum.put(k, size); + }); + int totalExecutionVerticesNum = vertexRemainingNum.values().stream() + .mapToInt(Integer::intValue) + .sum(); + int containerNum = resources.getRegisterContainers().size(); + resources.setActorPerContainer((int) Math + .ceil(totalExecutionVerticesNum * 1.0 / containerNum)); + LOG.info("Total execution vertices num: {}, container num: {}, capacity per container: {}.", + totalExecutionVerticesNum, containerNum, resources.getActorPerContainer()); + + int maxParallelism = executionGraph.getMaxParallelism(); + + for (int i = 0; i < maxParallelism; i++) { + for (ExecutionJobVertex executionJobVertex : vertices.values()) { + List exeVertices = executionJobVertex.getExecutionVertices(); + // current job vertex assign finished + if (exeVertices.size() <= i) { + continue; + } + + ExecutionVertex executionVertex = exeVertices.get(i); + + //check current container has enough resources. + checkResource(executionVertex.getResources()); + + Container targetContainer = resources.getRegisterContainers() + .get(resources.getCurrentContainerIndex()); + List targetSlots = targetContainer.getSlots(); + allocate(executionVertex, targetContainer, targetSlots.get(i % targetSlots.size())); + } + } + + return resources.getAllocatingMap(); + } + + private void checkResource(Map requiredResource) { + int checkedNum = 0; + // if current container does not have enough resource, go to the next one (loop) + while (!hasEnoughResource(requiredResource)) { + checkedNum++; + resources.setCurrentContainerIndex((resources.getCurrentContainerIndex() + 1) % + resources.getRegisterContainers().size()); + + Preconditions.checkArgument(checkedNum < resources.getRegisterContainers().size(), + "No enough resource left, required resource: {}, available resource: {}.", + requiredResource, resources.getAllAvailableResource()); + resources.setCurrentContainerAllocatedActorNum(0); + } + } + + private boolean hasEnoughResource(Map requiredResource) { + LOG.info("Check resource for container, index: {}.", resources.getCurrentContainerIndex()); + + if (null == requiredResource) { + return true; + } + + Container currentContainer = resources.getRegisterContainers() + .get(resources.getCurrentContainerIndex()); + List slotActors = resources.getAllocatingMap().get(currentContainer.getContainerId()); + if (slotActors != null && slotActors.size() > 0) { + long allocatedActorNum = slotActors.stream() + .map(Slot::getExecutionVertexIds) + .mapToLong(List::size) + .sum(); + if (allocatedActorNum >= resources.getActorPerContainer()) { + LOG.info("Container remaining capacity is 0. used: {}, total: {}.", allocatedActorNum, + resources.getActorPerContainer()); + return false; + } + } + + Map availableResource = currentContainer.getAvailableResource(); + for (Map.Entry entry : requiredResource.entrySet()) { + if (availableResource.containsKey(entry.getKey())) { + if (availableResource.get(entry.getKey()) < entry.getValue()) { + LOG.warn("No enough resource for container {}. required: {}, available: {}.", + currentContainer.getAddress(), requiredResource, availableResource); + return false; + } + } + } + return true; + } + + private void allocate(ExecutionVertex vertex, Container container, Slot slot) { + // set slot for execution vertex + LOG.info("Set slot {} to vertex {}.", slot, vertex); + vertex.setSlotIfNotExist(slot); + + Slot useSlot = resources.getAllocatingMap().get(container.getContainerId()) + .stream().filter(s -> s.getId() == slot.getId()).findFirst().get(); + useSlot.getExecutionVertexIds().add(vertex.getVertexId()); + + // current container reaches capacity limitation, go to the next one. + resources.setCurrentContainerAllocatedActorNum( + resources.getCurrentContainerAllocatedActorNum() + 1); + if (resources.getCurrentContainerAllocatedActorNum() >= resources.getActorPerContainer()) { + resources.setCurrentContainerIndex( + (resources.getCurrentContainerIndex() + 1) % resources.getRegisterContainers().size()); + resources.setCurrentContainerAllocatedActorNum(0); + } + } + + @Override + public String getName() { + return SlotAssignStrategyType.PIPELINE_FIRST_STRATEGY.getValue(); + } + + @Override + public void setResources(Resources resources) { + this.resources = resources; + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Container.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Container.java new file mode 100644 index 000000000..c8dc3552e --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Container.java @@ -0,0 +1,86 @@ +package org.ray.streaming.runtime.core.resource; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.api.id.UniqueId; + +/** + * Resource manager unit abstraction. + * Container identifies the available resource(cpu,mem) and allocated slots. + */ +public class Container implements Serializable { + + private ContainerID containerId; + private UniqueId nodeId; + private String address; + private String hostname; + + private Map availableResource = new HashMap<>(); + private List slots = new ArrayList<>(); + + public Container() { + } + + public Container(UniqueId nodeId, String address, String hostname) { + this.containerId = new ContainerID(); + this.nodeId = nodeId; + this.address = address; + this.hostname = hostname; + } + + public void setContainerId(ContainerID containerId) { + this.containerId = containerId; + } + + public ContainerID getContainerId() { + return containerId; + } + + public String getName() { + return containerId.toString(); + } + + public String getAddress() { + return address; + } + + public UniqueId getNodeId() { + return nodeId; + } + + public String getHostname() { + return hostname; + } + + public Map getAvailableResource() { + return availableResource; + } + + public void setAvailableResource(Map availableResource) { + this.availableResource = availableResource; + } + + public List getSlots() { + return slots; + } + + public void setSlots(List slots) { + this.slots = slots; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("containerId", containerId) + .add("nodeId", nodeId) + .add("address", address) + .add("hostname", hostname) + .add("availableResource", availableResource) + .add("slots", slots) + .toString(); + } +} \ No newline at end of file diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/ContainerID.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/ContainerID.java new file mode 100644 index 000000000..1ea53280e --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/ContainerID.java @@ -0,0 +1,9 @@ +package org.ray.streaming.runtime.core.resource; + +import org.ray.streaming.runtime.core.common.AbstractID; + +/** + * Container unique identifier. + */ +public class ContainerID extends AbstractID { +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Resources.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Resources.java new file mode 100644 index 000000000..4d866e7f2 --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Resources.java @@ -0,0 +1,134 @@ +package org.ray.streaming.runtime.core.resource; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.ray.api.id.UniqueId; +import org.ray.streaming.runtime.config.master.ResourceConfig; + +/** + * Resource description of ResourceManager. + */ +public class Resources implements Serializable { + + /** + * Available containers registered to ResourceManager. + */ + private List registerContainers = new ArrayList<>(); + + /** + * Mapping of allocated container to slots. + */ + private Map> allocatingMap = new HashMap<>(16); + + /** + * Number of slots per container. + */ + private int slotNumPerContainer = 0; + + /** + * Number of actors per container. + */ + private int actorPerContainer = 0; + + /** + * Number of actors that the current container has allocated. + */ + private int currentContainerAllocatedActorNum = 0; + + /** + * The container index currently being allocated. + */ + private int currentContainerIndex = 0; + + private int maxActorNumPerContainer; + + + public Resources(ResourceConfig resourceConfig) { + maxActorNumPerContainer = resourceConfig.maxActorNumPerContainer(); + } + + public List getRegisterContainers() { + return registerContainers; + } + + public void setSlotNumPerContainer(int slotNumPerContainer) { + this.slotNumPerContainer = slotNumPerContainer; + } + + public int getSlotNumPerContainer() { + return slotNumPerContainer; + } + + public void setRegisterContainers( + List registerContainers) { + this.registerContainers = registerContainers; + } + + public void setCurrentContainerIndex(int currentContainerIndex) { + this.currentContainerIndex = currentContainerIndex; + } + + public int getCurrentContainerIndex() { + return currentContainerIndex; + } + + public void setCurrentContainerAllocatedActorNum(int currentContainerAllocatedActorNum) { + this.currentContainerAllocatedActorNum = currentContainerAllocatedActorNum; + } + + public int getCurrentContainerAllocatedActorNum() { + return currentContainerAllocatedActorNum; + } + + public int getActorPerContainer() { + return actorPerContainer; + } + + public void setActorPerContainer(int actorPerContainer) { + this.actorPerContainer = actorPerContainer; + } + + public Container getRegisterContainerByContainerId(ContainerID containerID) { + return registerContainers.stream() + .filter(container -> container.getContainerId().equals(containerID)) + .findFirst().get(); + } + + public int getMaxActorNumPerContainer() { + return maxActorNumPerContainer; + } + + public Map> getAllocatingMap() { + return allocatingMap; + } + + public void setAllocatingMap( + Map> allocatingMap) { + this.allocatingMap = allocatingMap; + } + + public Map> getAllAvailableResource() { + Map> availableResource = new HashMap<>(); + for (Container container : registerContainers) { + availableResource.put(container.getNodeId(), container.getAvailableResource()); + } + return availableResource; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("registerContainers", registerContainers) + .add("allocatingMap", allocatingMap) + .add("slotNumPerContainer", slotNumPerContainer) + .add("actorPerContainer", actorPerContainer) + .add("currentContainerAllocatedActorNum", currentContainerAllocatedActorNum) + .add("currentContainerIndex", currentContainerIndex) + .add("maxActorNumPerContainer", maxActorNumPerContainer) + .toString(); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Slot.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Slot.java new file mode 100644 index 000000000..daa59571e --- /dev/null +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/core/resource/Slot.java @@ -0,0 +1,54 @@ +package org.ray.streaming.runtime.core.resource; + +import com.google.common.base.MoreObjects; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class Slot implements Serializable { + private int id; + /** + * The slot belongs to a container. + */ + private ContainerID containerID; + private AtomicInteger actorCount = new AtomicInteger(0); + /** + * List of ExecutionVertex ids belong to the slot. + */ + private List executionVertexIds = new ArrayList<>(); + + public Slot(int id, ContainerID containerID) { + this.id = id; + this.containerID = containerID; + } + + public int getId() { + return id; + } + + public ContainerID getContainerID() { + return containerID; + } + + public AtomicInteger getActorCount() { + return actorCount; + } + + public List getExecutionVertexIds() { + return executionVertexIds; + } + + public void setExecutionVertexIds(List executionVertexIds) { + this.executionVertexIds = executionVertexIds; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", id) + .add("containerID", containerID) + .add("actorCount", actorCount) + .toString(); + } +} diff --git a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java index e8453201b..a17c9940a 100644 --- a/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java +++ b/streaming/java/streaming-runtime/src/main/java/org/ray/streaming/runtime/master/graphmanager/GraphManagerImpl.java @@ -50,7 +50,8 @@ public class GraphManagerImpl implements GraphManager { long buildTime = executionGraph.getBuildTime(); for (JobVertex jobVertex : jobGraph.getJobVertexList()) { int jobVertexId = jobVertex.getVertexId(); - exeJobVertexMap.put(jobVertexId, new ExecutionJobVertex(jobVertex)); + exeJobVertexMap.put(jobVertexId, + new ExecutionJobVertex(jobVertex, runtimeContext)); } // connect vertex diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java index dd7c5e591..0e0baa083 100644 --- a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/graph/ExecutionGraphTest.java @@ -1,7 +1,13 @@ package org.ray.streaming.runtime.graph; import com.google.common.collect.Lists; + + +import java.util.HashMap; import java.util.List; +import java.util.Map; + + import org.ray.streaming.api.context.StreamingContext; import org.ray.streaming.api.stream.DataStream; import org.ray.streaming.api.stream.DataStreamSource; @@ -9,6 +15,7 @@ import org.ray.streaming.api.stream.StreamSink; import org.ray.streaming.jobgraph.JobGraph; import org.ray.streaming.jobgraph.JobGraphBuilder; import org.ray.streaming.runtime.BaseUnitTest; +import org.ray.streaming.runtime.config.StreamingConfig; import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionJobVertex; import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex; @@ -26,7 +33,9 @@ public class ExecutionGraphTest extends BaseUnitTest { @Test public void testBuildExecutionGraph() { - GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(null)); + Map jobConf = new HashMap<>(); + StreamingConfig streamingConfig = new StreamingConfig(jobConf); + GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig)); JobGraph jobGraph = buildJobGraph(); ExecutionGraph executionGraph = buildExecutionGraph(graphManager, jobGraph); List executionJobVertices = executionGraph.getExecutionJobVertexLices(); diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java new file mode 100644 index 000000000..77bdef73c --- /dev/null +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/resourcemanager/ResourceManagerTest.java @@ -0,0 +1,99 @@ +package org.ray.streaming.runtime.resourcemanager; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.aeonbits.owner.util.Collections; +import org.ray.api.Ray; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.runtime.BaseUnitTest; +import org.ray.streaming.runtime.TestHelper; +import org.ray.streaming.runtime.config.StreamingConfig; +import org.ray.streaming.runtime.config.global.CommonConfig; +import org.ray.streaming.runtime.config.master.ResourceConfig; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.master.resourcemanager.ResourceManager; +import org.ray.streaming.runtime.core.master.resourcemanager.ResourceManagerImpl; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy; +import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.ContainerID; +import org.ray.streaming.runtime.core.resource.Slot; +import org.ray.streaming.runtime.graph.ExecutionGraphTest; +import org.ray.streaming.runtime.master.JobRuntimeContext; +import org.ray.streaming.runtime.master.graphmanager.GraphManager; +import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ResourceManagerTest extends BaseUnitTest { + + private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerTest.class); + + @org.testng.annotations.BeforeClass + public void setUp() { + // ray init + Ray.init(); + TestHelper.setUTFlag(); + } + + @org.testng.annotations.AfterClass + public void tearDown() { + TestHelper.clearUTFlag(); + } + + @Test + public void testApi() { + Map conf = new HashMap(); + conf.put(CommonConfig.JOB_NAME, "testApi"); + conf.put(ResourceConfig.TASK_RESOURCE_CPU_LIMIT_ENABLE, "true"); + conf.put(ResourceConfig.TASK_RESOURCE_MEM_LIMIT_ENABLE, "true"); + conf.put(ResourceConfig.TASK_RESOURCE_MEM, "10"); + conf.put(ResourceConfig.TASK_RESOURCE_CPU, "2"); + StreamingConfig config = new StreamingConfig(conf); + JobRuntimeContext jobRuntimeContext = new JobRuntimeContext(config); + ResourceManager resourceManager = new ResourceManagerImpl(jobRuntimeContext); + + SlotAssignStrategy slotAssignStrategy = resourceManager.getSlotAssignStrategy(); + Assert.assertTrue(slotAssignStrategy instanceof PipelineFirstStrategy); + + Map containerResource = new HashMap<>(); + containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0); + containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0); + Container container1 = new Container(null, "testAddress1", "testHostName1"); + container1.setAvailableResource(containerResource); + Container container2 = new Container(null, "testAddress2", "testHostName2"); + container2.setAvailableResource(new HashMap<>(containerResource)); + List containers = Collections.list(container1, container2); + resourceManager.getResources().getRegisterContainers().addAll(containers); + Assert.assertEquals(resourceManager.getRegisteredContainers().size(), 2); + + //build ExecutionGraph + GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(config)); + JobGraph jobGraph = ExecutionGraphTest.buildJobGraph(); + ExecutionGraph executionGraph = ExecutionGraphTest.buildExecutionGraph(graphManager, jobGraph); + + int slotNumPerContainer = slotAssignStrategy.getSlotNumPerContainer(containers, executionGraph + .getMaxParallelism()); + Assert.assertEquals(slotNumPerContainer, 1); + + slotAssignStrategy.allocateSlot(containers, slotNumPerContainer); + + Map> allocatingMap = slotAssignStrategy.assignSlot(executionGraph); + Assert.assertEquals(allocatingMap.size(), 2); + + executionGraph.getAllAddedExecutionVertices().forEach(vertex -> { + Container container = resourceManager.getResources() + .getRegisterContainerByContainerId(vertex.getSlot().getContainerID()); + Map resource = resourceManager.allocateResource(container, vertex.getResources()); + Assert.assertNotNull(resource); + }); + Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0); + Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_CPU), 14.0); + Assert.assertEquals(container1.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0); + Assert.assertEquals(container2.getAvailableResource().get(ResourceConfig.RESOURCE_KEY_MEM), 118.0); + } +} diff --git a/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java new file mode 100644 index 000000000..66b6cb66d --- /dev/null +++ b/streaming/java/streaming-runtime/src/test/java/org/ray/streaming/runtime/schedule/strategy/PipelineFirstStrategyTest.java @@ -0,0 +1,108 @@ +package org.ray.streaming.runtime.schedule.strategy; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + + +import com.google.common.collect.Lists; +import org.aeonbits.owner.ConfigFactory; +import org.ray.api.RayActor; +import org.ray.api.id.ActorId; +import org.ray.api.id.ObjectId; +import org.ray.api.id.UniqueId; +import org.ray.runtime.actor.LocalModeRayActor; +import org.ray.streaming.api.context.RuntimeContext; +import org.ray.streaming.api.context.StreamingContext; +import org.ray.streaming.api.stream.DataStream; +import org.ray.streaming.api.stream.DataStreamSink; +import org.ray.streaming.api.stream.DataStreamSource; +import org.ray.streaming.jobgraph.JobGraph; +import org.ray.streaming.jobgraph.JobGraphBuilder; +import org.ray.streaming.runtime.BaseUnitTest; +import org.ray.streaming.runtime.config.StreamingConfig; +import org.ray.streaming.runtime.config.StreamingMasterConfig; +import org.ray.streaming.runtime.config.master.ResourceConfig; +import org.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph; +import org.ray.streaming.runtime.core.master.scheduler.strategy.SlotAssignStrategy; +import org.ray.streaming.runtime.core.master.scheduler.strategy.impl.PipelineFirstStrategy; +import org.ray.streaming.runtime.core.resource.Container; +import org.ray.streaming.runtime.core.resource.ContainerID; +import org.ray.streaming.runtime.core.resource.Resources; +import org.ray.streaming.runtime.core.resource.Slot; +import org.ray.streaming.runtime.graph.ExecutionGraphTest; +import org.ray.streaming.runtime.master.JobRuntimeContext; +import org.ray.streaming.runtime.master.graphmanager.GraphManager; +import org.ray.streaming.runtime.master.graphmanager.GraphManagerImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class PipelineFirstStrategyTest extends BaseUnitTest { + + private Logger LOG = LoggerFactory.getLogger(PipelineFirstStrategyTest.class); + + private SlotAssignStrategy strategy; + private List containers = new ArrayList<>(); + private JobGraph jobGraph; + private ExecutionGraph executionGraph; + private int maxParallelism; + + @BeforeClass + public void setUp() { + strategy = new PipelineFirstStrategy(); + Map conf = new HashMap<>(); + ResourceConfig resourceConfig = ConfigFactory.create(ResourceConfig.class, conf); + Resources resources = new Resources(resourceConfig); + + Map containerResource = new HashMap<>(); + containerResource.put(ResourceConfig.RESOURCE_KEY_CPU, 16.0); + containerResource.put(ResourceConfig.RESOURCE_KEY_MEM, 128.0); + for (int i = 0; i < 2; ++i) { + UniqueId uniqueId = UniqueId.randomId(); + Container container = new Container(uniqueId, "1.1.1." + i, "localhost" + i); + container.setAvailableResource(containerResource); + containers.add(container); + resources.getRegisterContainers().add(container); + } + strategy.setResources(resources); + + //build ExecutionGraph + Map jobConf = new HashMap<>(); + StreamingConfig streamingConfig = new StreamingConfig(jobConf); + GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig)); + jobGraph = ExecutionGraphTest.buildJobGraph(); + executionGraph = ExecutionGraphTest.buildExecutionGraph(graphManager, jobGraph); + maxParallelism = executionGraph.getMaxParallelism(); + } + + @Test + public int testSlotNumPerContainer() { + int slotNumPerContainer = strategy.getSlotNumPerContainer(containers, maxParallelism); + Assert.assertEquals(slotNumPerContainer, + (int) Math.ceil(Math.max(maxParallelism, containers.size()) * 1.0 / containers.size())); + return slotNumPerContainer; + } + + @Test + public void testAllocateSlot() { + int slotNumPerContainer = testSlotNumPerContainer(); + strategy.allocateSlot(containers, slotNumPerContainer); + for (Container container : containers) { + Assert.assertEquals(container.getSlots().size(), slotNumPerContainer); + } + } + + @Test + public void testAssignSlot() { + Map> allocatingMap = strategy.assignSlot(executionGraph); + for (Entry> containerSlotEntry : allocatingMap.entrySet()) { + containerSlotEntry.getValue() + .forEach(slot -> Assert.assertNotEquals(slot.getExecutionVertexIds().size(), 0)); + } + } +}