[streaming]Add master and scheduler. (#8044)

This commit is contained in:
Tianyi Chen 2020-04-22 14:43:56 +08:00 committed by GitHub
parent c486b56c58
commit 0204dff1e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 837 additions and 52 deletions

View file

@ -113,7 +113,6 @@ define_java_module(
"@ray_streaming_maven//:org_testng_testng",
"@ray_streaming_maven//:org_mockito_mockito_all",
"@ray_streaming_maven//:org_powermock_powermock_api_mockito",
"@ray_streaming_maven//:org_powermock_powermock_core",
"@ray_streaming_maven//:org_powermock_powermock_module_testng",
"@ray_streaming_maven//:org_projectlombok_lombok",
],
@ -127,10 +126,7 @@ define_java_module(
"@ray_streaming_maven//:com_google_protobuf_protobuf_java",
"@ray_streaming_maven//:de_ruedigermoeller_fst",
"@ray_streaming_maven//:org_aeonbits_owner_owner",
"@ray_streaming_maven//:org_mockito_mockito_all",
"@ray_streaming_maven//:org_msgpack_msgpack_core",
"@ray_streaming_maven//:org_powermock_powermock_api_mockito",
"@ray_streaming_maven//:org_powermock_powermock_module_testng",
"@ray_streaming_maven//:org_projectlombok_lombok",
"@ray_streaming_maven//:org_slf4j_slf4j_api",
"@ray_streaming_maven//:org_slf4j_slf4j_log4j12",

View file

@ -1,11 +1,10 @@
package io.ray.streaming.runtime.config;
import java.io.Serializable;
import javax.accessibility.Accessible;
import org.aeonbits.owner.Accessible;
/**
* Basic config interface.
*/
public interface Config extends org.aeonbits.owner.Config, Accessible, Serializable {
public interface Config extends org.aeonbits.owner.Config, Accessible {
}

View file

@ -1,6 +1,7 @@
package io.ray.streaming.runtime.config;
import io.ray.streaming.runtime.config.master.ResourceConfig;
import io.ray.streaming.runtime.config.master.SchedulerConfig;
import java.util.Map;
import org.aeonbits.owner.ConfigFactory;
import org.slf4j.Logger;
@ -14,9 +15,11 @@ public class StreamingMasterConfig extends StreamingGlobalConfig {
private static final Logger LOG = LoggerFactory.getLogger(StreamingMasterConfig.class);
public ResourceConfig resourceConfig;
public SchedulerConfig schedulerConfig;
public StreamingMasterConfig(final Map<String, String> conf) {
super(conf);
this.resourceConfig = ConfigFactory.create(ResourceConfig.class, conf);
this.schedulerConfig = ConfigFactory.create(SchedulerConfig.class, conf);
}
}

View file

@ -0,0 +1,33 @@
package io.ray.streaming.runtime.config.master;
import io.ray.streaming.runtime.config.Config;
/**
* Configuration for job scheduler.
*/
public interface SchedulerConfig extends Config {
String WORKER_INITIATION_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.initiation.timeout.ms";
String WORKER_STARTING_WAIT_TIMEOUT_MS = "streaming.scheduler.worker.starting.timeout.ms";
/**
* The timeout ms of worker initiation.
* Default is: 10000ms(10s).
*
* @return timeout ms
*/
@Key(WORKER_INITIATION_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
int workerInitiationWaitTimeoutMs();
/**
* The timeout ms of worker starting.
* Default is: 10000ms(10s).
*
* @return timeout ms
*/
@Key(WORKER_STARTING_WAIT_TIMEOUT_MS)
@DefaultValue(value = "10000")
int workerStartingWaitTimeoutMs();
}

View file

@ -1,5 +1,7 @@
package io.ray.streaming.runtime.core.graph.executiongraph;
import io.ray.api.RayActor;
import io.ray.streaming.runtime.worker.JobWorker;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@ -136,4 +138,68 @@ public class ExecutionGraph implements Serializable {
throw new RuntimeException("Vertex " + vertexId + " does not exist!");
}
/**
* Get all actors by graph.
*
* @return actor list
*/
public List<RayActor<JobWorker>> getAllActors() {
return getActorsFromJobVertices(getExecutionJobVertexList());
}
/**
* Get source actors by graph.
*
* @return actor list
*/
public List<RayActor<JobWorker>> getSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSourceVertex)
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
/**
* Get transformation and sink actors by graph.
*
* @return actor list
*/
public List<RayActor<JobWorker>> getNonSourceActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(executionJobVertex -> executionJobVertex.isTransformationVertex()
|| executionJobVertex.isSinkVertex())
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
/**
* Get sink actors by graph.
*
* @return actor list
*/
public List<RayActor<JobWorker>> getSinkActors() {
List<ExecutionJobVertex> executionJobVertices = getExecutionJobVertexList().stream()
.filter(ExecutionJobVertex::isSinkVertex)
.collect(Collectors.toList());
return getActorsFromJobVertices(executionJobVertices);
}
/**
* Get actors according to job vertices.
*
* @param executionJobVertices specified job vertices
* @return actor list
*/
public List<RayActor<JobWorker>> getActorsFromJobVertices(
List<ExecutionJobVertex> executionJobVertices) {
return executionJobVertices.stream()
.map(ExecutionJobVertex::getExecutionVertices)
.flatMap(Collection::stream)
.map(ExecutionVertex::getWorkerActor)
.collect(Collectors.toList());
}
}

View file

@ -15,6 +15,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Physical vertex, correspond to {@link ExecutionJobVertex}.
@ -190,6 +191,11 @@ public class ExecutionVertex implements Serializable {
return false;
}
@Override
public int hashCode() {
return Objects.hash(id, outputEdges);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)

View file

@ -0,0 +1,124 @@
package io.ray.streaming.runtime.master;
import com.google.common.base.Preconditions;
import io.ray.api.RayActor;
import io.ray.streaming.jobgraph.JobGraph;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.config.StreamingMasterConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl;
import io.ray.streaming.runtime.master.scheduler.JobSchedulerImpl;
import io.ray.streaming.runtime.worker.JobWorker;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* JobMaster is the core controller in streaming job as a ray actor. It is responsible for all the
* controls facing the {@link JobWorker}.
*/
public class JobMaster {
private static final Logger LOG = LoggerFactory.getLogger(JobMaster.class);
private JobRuntimeContext runtimeContext;
private ResourceManager resourceManager;
private JobSchedulerImpl scheduler;
private GraphManager graphManager;
private StreamingMasterConfig conf;
private RayActor jobMasterActor;
public JobMaster(Map<String, String> confMap) {
LOG.info("Creating job master with conf: {}.", confMap);
StreamingConfig streamingConfig = new StreamingConfig(confMap);
this.conf = streamingConfig.masterConfig;
// init runtime context
runtimeContext = new JobRuntimeContext(streamingConfig);
LOG.info("Finished creating job master.");
}
/**
* Init JobMaster. To initiate or recover other components(like metrics and extra coordinators).
*
* @return init result
*/
public Boolean init() {
LOG.info("Initializing job master.");
if (this.runtimeContext.getExecutionGraph() == null) {
LOG.error("Init job master failed. Job graphs is null.");
return false;
}
ExecutionGraph executionGraph = graphManager.getExecutionGraph();
Preconditions.checkArgument(executionGraph != null, "no execution graph");
LOG.info("Finished initializing job master.");
return true;
}
/**
* Submit job to run:
* <ol>
* <li> Using GraphManager to build physical plan according to the logical plan.</li>
* <li> Using ResourceManager to manage and allocate the resources.</li>
* <li> Using JobScheduler to schedule the job to run.</li>
* </ol>
*
* @param jobMasterActor JobMaster actor
* @param jobGraph logical plan
* @return submit result
*/
public boolean submitJob(RayActor<JobMaster> jobMasterActor, JobGraph jobGraph) {
LOG.info("Begin submitting job using logical plan: {}.", jobGraph);
this.jobMasterActor = jobMasterActor;
// init manager
graphManager = new GraphManagerImpl(runtimeContext);
resourceManager = new ResourceManagerImpl(runtimeContext);
// build and set graph into runtime context
ExecutionGraph executionGraph = graphManager.buildExecutionGraph(jobGraph);
runtimeContext.setJobGraph(jobGraph);
runtimeContext.setExecutionGraph(executionGraph);
// init scheduler
try {
scheduler = new JobSchedulerImpl(this);
scheduler.scheduleJob(graphManager.getExecutionGraph());
} catch (Exception e) {
LOG.error("Failed to submit job.", e);
return false;
}
return true;
}
public RayActor getJobMasterActor() {
return jobMasterActor;
}
public JobRuntimeContext getRuntimeContext() {
return runtimeContext;
}
public ResourceManager getResourceManager() {
return resourceManager;
}
public GraphManager getGraphManager() {
return graphManager;
}
public StreamingMasterConfig getConf() {
return conf;
}
}

View file

@ -0,0 +1,16 @@
package io.ray.streaming.runtime.master.scheduler;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
/**
* Job scheduler is used to do the scheduling in JobMaster.
*/
public interface JobScheduler {
/**
* Schedule streaming job using the physical plan.
* @param executionGraph physical plan
* @return scheduling result
*/
boolean scheduleJob(ExecutionGraph executionGraph);
}

View file

@ -0,0 +1,201 @@
package io.ray.streaming.runtime.master.scheduler;
import io.ray.api.RayActor;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ViewBuilder;
import io.ray.streaming.runtime.master.scheduler.controller.WorkerLifecycleController;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Job scheduler implementation.
*/
public class JobSchedulerImpl implements JobScheduler {
private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
private StreamingConfig jobConf;
private final JobMaster jobMaster;
private final ResourceManager resourceManager;
private final GraphManager graphManager;
private final WorkerLifecycleController workerLifecycleController;
public JobSchedulerImpl(JobMaster jobMaster) {
this.jobMaster = jobMaster;
this.graphManager = jobMaster.getGraphManager();
this.resourceManager = jobMaster.getResourceManager();
this.workerLifecycleController = new WorkerLifecycleController();
this.jobConf = jobMaster.getRuntimeContext().getConf();
LOG.info("Scheduler initiated.");
}
@Override
public boolean scheduleJob(ExecutionGraph executionGraph) {
LOG.info("Begin scheduling. Job: {}.", executionGraph.getJobName());
// Allocate resource then create workers
prepareResourceAndCreateWorker(executionGraph);
// init worker context and start to run
initAndStart(executionGraph);
return true;
}
/**
* Allocating job worker resource then create job worker actor
*
* @param executionGraph
*/
protected void prepareResourceAndCreateWorker(ExecutionGraph executionGraph) {
List<Container> containers = resourceManager.getRegisteredContainers();
// Assign resource for execution vertices
resourceManager.assignResource(containers, executionGraph);
LOG.info("Allocating map is: {}.", ViewBuilder.buildResourceAssignmentView(containers));
// Start all new added workers
createWorkers(executionGraph);
}
/**
* Init JobMaster and JobWorkers then start JobWorkers.
*
* @param executionGraph physical plan
*/
private void initAndStart(ExecutionGraph executionGraph) {
// generate vertex - context map
Map<ExecutionVertex, JobWorkerContext> vertexToContextMap = buildWorkersContext(executionGraph);
// init workers
initWorkers(vertexToContextMap);
// init master
initMaster();
// start workers
startWorkers(executionGraph);
}
/**
* Create JobWorker actors according to the physical plan.
*
* @param executionGraph physical plan
* @return actor creation result
*/
public boolean createWorkers(ExecutionGraph executionGraph) {
LOG.info("Begin creating workers.");
long startTs = System.currentTimeMillis();
// create JobWorker actors
boolean createResult = workerLifecycleController
.createWorkers(executionGraph.getAllAddedExecutionVertices());
if (createResult) {
LOG.info("Finished creating workers. Cost {} ms.", System.currentTimeMillis() - startTs);
return true;
} else {
LOG.error("Failed to create workers. Cost {} ms.", System.currentTimeMillis() - startTs);
return false;
}
}
/**
* Init JobWorkers according to the vertex and context infos.
*
* @param vertexToContextMap vertex - context map
*/
protected boolean initWorkers(Map<ExecutionVertex, JobWorkerContext> vertexToContextMap) {
boolean result;
try {
result = workerLifecycleController.initWorkers(vertexToContextMap,
jobConf.masterConfig.schedulerConfig.workerInitiationWaitTimeoutMs());
} catch (Exception e) {
LOG.error("Failed to initiate workers.", e);
return false;
}
return result;
}
/**
* Start JobWorkers according to the physical plan.
*/
public boolean startWorkers(ExecutionGraph executionGraph) {
boolean result;
try {
result = workerLifecycleController.startWorkers(
executionGraph, jobConf.masterConfig.schedulerConfig.workerStartingWaitTimeoutMs());
} catch (Exception e) {
LOG.error("Failed to start workers.", e);
return false;
}
return result;
}
/**
* Build workers context.
*
* @param executionGraph execution graph
* @return vertex to worker context map
*/
protected Map<ExecutionVertex, JobWorkerContext> buildWorkersContext(
ExecutionGraph executionGraph) {
RayActor masterActor = jobMaster.getJobMasterActor();
// build workers' context
Map<ExecutionVertex, JobWorkerContext> needRegistryVertexToContextMap = new HashMap<>();
executionGraph.getAllExecutionVertices().forEach(vertex -> {
JobWorkerContext ctx = buildJobWorkerContext(vertex, masterActor);
needRegistryVertexToContextMap.put(vertex, ctx);
});
return needRegistryVertexToContextMap;
}
private JobWorkerContext buildJobWorkerContext(
ExecutionVertex executionVertex,
RayActor<JobMaster> masterActor) {
// create worker context
JobWorkerContext ctx = new JobWorkerContext(
executionVertex.getWorkerActorId(),
masterActor,
executionVertex
);
return ctx;
}
/**
* Destroy JobWorkers according to the vertex infos.
*
* @param executionVertices specified vertices
*/
public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
boolean result;
try {
result = workerLifecycleController.destroyWorkers(executionVertices);
} catch (Exception e) {
LOG.error("Failed to destroy workers.", e);
return false;
}
return result;
}
private void initMaster() {
jobMaster.init();
}
}

View file

@ -0,0 +1,184 @@
package io.ray.streaming.runtime.master.scheduler.controller;
import io.ray.api.Ray;
import io.ray.api.RayActor;
import io.ray.api.RayObject;
import io.ray.api.WaitResult;
import io.ray.api.id.ActorId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.streaming.api.Language;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.rpc.RemoteCallWorker;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Worker lifecycle controller is used to control JobWorker's creation, initiation and so on.
*/
public class WorkerLifecycleController {
private static final Logger LOG = LoggerFactory.getLogger(WorkerLifecycleController.class);
public boolean createWorkers(List<ExecutionVertex> executionVertices) {
return asyncBatchExecute(this::createWorker, executionVertices);
}
/**
* Create JobWorker actor according to the execution vertex.
*
* @param executionVertex target execution vertex
* @return creation result
*/
private boolean createWorker(ExecutionVertex executionVertex) {
LOG.info("Start to create worker actor for vertex: {} with resource: {}.",
executionVertex.getVertexName(), executionVertex.getResources());
Language language = executionVertex.getLanguage();
ActorCreationOptions options = new ActorCreationOptions.Builder()
.setResources(executionVertex.getResources())
.setMaxReconstructions(ActorCreationOptions.INFINITE_RECONSTRUCTION)
.createActorCreationOptions();
RayActor<JobWorker> actor = null;
// TODO (datayjz): ray create actor
if (null == actor) {
LOG.error("Create worker actor failed.");
return false;
}
executionVertex.setWorkerActor(actor);
LOG.info("Worker actor created, actor: {}, vertex: {}.",
executionVertex.getWorkerActorId(), executionVertex.getVertexName());
return true;
}
/**
* Using context to init JobWorker.
*
* @param vertexToContextMap target JobWorker actor
* @param timeout timeout for waiting, unit: ms
* @return initiation result
*/
public boolean initWorkers(
Map<ExecutionVertex, JobWorkerContext> vertexToContextMap, int timeout) {
LOG.info("Begin initiating workers: {}.", vertexToContextMap);
long startTime = System.currentTimeMillis();
Map<RayObject<Boolean>, ActorId> rayObjects = new HashMap<>();
vertexToContextMap.entrySet().forEach((entry -> {
ExecutionVertex vertex = entry.getKey();
rayObjects.put(RemoteCallWorker.initWorker(vertex.getWorkerActor(), entry.getValue()),
vertex.getWorkerActorId());
}));
List<RayObject<Boolean>> rayObjectList = new ArrayList<>(rayObjects.keySet());
LOG.info("Waiting for workers' initialization.");
WaitResult<Boolean> result = Ray.wait(rayObjectList, rayObjectList.size(), timeout);
if (result.getReady().size() != rayObjectList.size()) {
LOG.error("Initializing workers timeout[{} ms].", timeout);
return false;
}
LOG.info("Finished waiting workers' initialization.");
LOG.info("Workers initialized. Cost {} ms.", System.currentTimeMillis() - startTime);
return true;
}
/**
* Start JobWorkers to run task.
*
* @param executionGraph physical plan
* @param timeout timeout for waiting, unit: ms
* @return starting result
*/
public boolean startWorkers(ExecutionGraph executionGraph, int timeout) {
LOG.info("Begin starting workers.");
long startTime = System.currentTimeMillis();
List<RayObject<Boolean>> rayObjects = new ArrayList<>();
// start source actors 1st
executionGraph.getSourceActors()
.forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
// then start non-source actors
executionGraph.getNonSourceActors()
.forEach(actor -> rayObjects.add(RemoteCallWorker.startWorker(actor)));
WaitResult<Boolean> result = Ray.wait(rayObjects, rayObjects.size(), timeout);
if (result.getReady().size() != rayObjects.size()) {
LOG.error("Starting workers timeout[{} ms].", timeout);
return false;
}
LOG.info("Workers started. Cost {} ms.", System.currentTimeMillis() - startTime);
return true;
}
/**
* Stop and destroy JobWorkers' actor.
*
* @param executionVertices target vertices
* @return destroy result
*/
public boolean destroyWorkers(List<ExecutionVertex> executionVertices) {
return asyncBatchExecute(this::destroyWorker, executionVertices);
}
private boolean destroyWorker(ExecutionVertex executionVertex) {
RayActor rayActor = executionVertex.getWorkerActor();
LOG.info("Begin destroying worker[vertex={}, actor={}].",
executionVertex.getVertexName(), rayActor.getId());
boolean destroyResult = RemoteCallWorker.shutdownWithoutReconstruction(rayActor);
if (!destroyResult) {
LOG.error("Failed to destroy JobWorker[{}]'s actor: {}.",
executionVertex.getVertexName(), rayActor);
return false;
}
LOG.info("Worker destroyed, actor: {}.", rayActor);
return true;
}
/**
* Async batch execute function, for some cases that could not use Ray.wait
*
* @param operation the function to be executed
*/
private boolean asyncBatchExecute(
Function<ExecutionVertex, Boolean> operation,
List<ExecutionVertex> executionVertices) {
final Object asyncContext = Ray.getAsyncContext();
List<CompletableFuture<Boolean>> futureResults = executionVertices.stream()
.map(vertex -> CompletableFuture.supplyAsync(() -> {
Ray.setAsyncContext(asyncContext);
return operation.apply(vertex);
})).collect(Collectors.toList());
List<Boolean> succeeded = futureResults.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
if (succeeded.stream().anyMatch(x -> !x)) {
LOG.error("Not all futures return true, check ResourceManager'log the detail.");
return false;
}
return true;
}
}

View file

@ -0,0 +1,69 @@
package io.ray.streaming.runtime.rpc;
import io.ray.api.RayActor;
import io.ray.api.RayObject;
import io.ray.streaming.runtime.master.JobMaster;
import io.ray.streaming.runtime.worker.JobWorker;
import io.ray.streaming.runtime.worker.context.JobWorkerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Ray call worker.
* It takes the communication job from {@link JobMaster} to {@link JobWorker}.
*/
public class RemoteCallWorker {
private static final Logger LOG = LoggerFactory.getLogger(RemoteCallWorker.class);
/**
* Call JobWorker actor to init.
*
* @param actor target JobWorker actor
* @param ctx JobWorker's context
* @return init result
*/
public static RayObject<Boolean> initWorker(RayActor actor, JobWorkerContext ctx) {
LOG.info("Call worker to init, actor: {}, context: {}.", actor.getId(), ctx);
RayObject<Boolean> result = null;
// TODO (datayjz): ray call worker to initiate
LOG.info("Finished calling worker to init.");
return result;
}
/**
* Call JobWorker actor to start.
*
* @param actor target JobWorker actor
* @return start result
*/
public static RayObject<Boolean> startWorker(RayActor actor) {
LOG.info("Call worker to start, actor: {}.", actor.getId());
RayObject<Boolean> result = null;
// TODO (datayjz): ray call worker to start
LOG.info("Finished calling worker to start.");
return result;
}
/**
* Call JobWorker actor to destroy without reconstruction.
*
* @param actor target JobWorker actor
* @return destroy result
*/
public static Boolean shutdownWithoutReconstruction(RayActor actor) {
LOG.info("Call worker to shutdown without reconstruction, actor is {}.",
actor.getId());
Boolean result = false;
// TODO (datayjz): ray call worker to destroy
LOG.info("Finished calling wk shutdownWithoutReconstruction, result is {}.", result);
return result;
}
}

View file

@ -3,6 +3,8 @@ package io.ray.streaming.runtime.util;
import io.ray.api.Ray;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -18,6 +20,10 @@ public class RayUtils {
* @return node info list
*/
public static List<NodeInfo> getAllNodeInfo() {
if (Ray.getRuntimeContext().isSingleProcess()) {
// only for single process(for unit test)
return mockContainerResources();
}
return Ray.getRuntimeContext().getAllNodeInfo();
}
@ -31,4 +37,25 @@ public class RayUtils {
.filter(nodeInfo -> nodeInfo.isAlive)
.collect(Collectors.toMap(nodeInfo -> nodeInfo.nodeId, nodeInfo -> nodeInfo));
}
private static List<NodeInfo> mockContainerResources() {
List<NodeInfo> nodeInfos = new LinkedList<>();
for (int i = 1; i <= 5; i++) {
Map<String, Double> resources = new HashMap<>();
resources.put("CPU", (double) i);
resources.put("MEM", 16.0);
byte[] nodeIdBytes = new byte[UniqueId.LENGTH];
for (int byteIndex = 0; byteIndex < UniqueId.LENGTH; ++byteIndex) {
nodeIdBytes[byteIndex] = String.valueOf(i).getBytes()[0];
}
NodeInfo nodeInfo = new NodeInfo(new UniqueId(nodeIdBytes),
"localhost" + i, "localhost" + i,
true, resources);
nodeInfos.add(nodeInfo);
}
return nodeInfos;
}
}

View file

@ -0,0 +1,59 @@
package io.ray.streaming.runtime.worker.context;
import com.google.common.base.MoreObjects;
import io.ray.api.RayActor;
import io.ray.api.id.ActorId;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionVertex;
import io.ray.streaming.runtime.master.JobMaster;
import java.io.Serializable;
/**
* Job worker context.
*/
public class JobWorkerContext implements Serializable {
/**
* Worker actor's id.
*/
private ActorId workerId;
/**
* JobMaster actor.
*/
private RayActor<JobMaster> master;
/**
* Worker's vertex info.
*/
private ExecutionVertex executionVertex;
public JobWorkerContext(
ActorId workerId,
RayActor<JobMaster> master,
ExecutionVertex executionVertex) {
this.workerId = workerId;
this.master = master;
this.executionVertex = executionVertex;
}
public ActorId getWorkerId() {
return workerId;
}
public RayActor<JobMaster> getMaster() {
return master;
}
public ExecutionVertex getExecutionVertex() {
return executionVertex;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("workerId", workerId)
.add("master", master)
.toString();
}
}

View file

@ -1,4 +1,4 @@
package io.ray.streaming.runtime.graph;
package io.ray.streaming.runtime.core.graph;
import com.google.common.collect.Lists;
import io.ray.streaming.api.context.StreamingContext;

View file

@ -0,0 +1,20 @@
package io.ray.streaming.runtime.master;
import java.util.HashMap;
import org.testng.Assert;
import org.testng.annotations.Test;
public class JobMasterTest {
@Test
public void testCreation() {
JobMaster jobMaster = new JobMaster(new HashMap<>());
Assert.assertNotNull(jobMaster.getRuntimeContext());
Assert.assertNotNull(jobMaster.getConf());
Assert.assertNull(jobMaster.getGraphManager());
Assert.assertNull(jobMaster.getResourceManager());
Assert.assertNull(jobMaster.getJobMasterActor());
Assert.assertFalse(jobMaster.init());
}
}

View file

@ -0,0 +1,12 @@
package io.ray.streaming.runtime.master.jobscheduler;
import org.testng.annotations.Test;
public class JobSchedulerTest {
@Test
public void testSchedule() {
// TODO (tianyi): need JobWorker Part to do this.
}
}

View file

@ -1,60 +1,34 @@
package io.ray.streaming.runtime.resourcemanager;
package io.ray.streaming.runtime.master.resourcemanager;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.mockito.MockitoAnnotations;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import io.ray.api.Ray;
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.streaming.runtime.BaseUnitTest;
import io.ray.streaming.runtime.config.StreamingConfig;
import io.ray.streaming.runtime.config.global.CommonConfig;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManager;
import io.ray.streaming.runtime.master.resourcemanager.ResourceManagerImpl;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.master.JobRuntimeContext;
import io.ray.streaming.runtime.util.Mockitools;
import io.ray.streaming.runtime.util.RayUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@PrepareForTest(RayUtils.class)
@PowerMockIgnore({"org.slf4j.*", "javax.xml.*"})
public class ResourceManagerTest {
public class ResourceManagerTest extends BaseUnitTest {
private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerTest.class);
private Object rayAsyncContext;
@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
@org.testng.annotations.BeforeClass
public void setUp() {
LOG.warn("Do set up");
MockitoAnnotations.initMocks(this);
}
@org.testng.annotations.AfterClass
public void tearDown() {
LOG.warn("Do tear down");
}
@BeforeMethod
public void mockGscApi() {
public void init() {
// ray init
Ray.init();
rayAsyncContext = Ray.getAsyncContext();
Mockitools.mockGscApi();
}
@Test
@ -63,7 +37,7 @@ public class ResourceManagerTest {
Assert.assertEquals(nodeInfoMap.size(), 5);
}
@Test
@Test(dependsOnMethods = "testGcsMockedApi")
public void testApi() {
Ray.setAsyncContext(rayAsyncContext);

View file

@ -1,4 +1,4 @@
package io.ray.streaming.runtime.schedule.strategy;
package io.ray.streaming.runtime.master.resourcemanager.strategy;
import io.ray.api.id.UniqueId;
import io.ray.streaming.jobgraph.JobGraph;
@ -8,12 +8,11 @@ import io.ray.streaming.runtime.config.types.ResourceAssignStrategyType;
import io.ray.streaming.runtime.core.graph.executiongraph.ExecutionGraph;
import io.ray.streaming.runtime.core.resource.Container;
import io.ray.streaming.runtime.core.resource.ResourceType;
import io.ray.streaming.runtime.graph.ExecutionGraphTest;
import io.ray.streaming.runtime.core.graph.ExecutionGraphTest;
import io.ray.streaming.runtime.master.JobRuntimeContext;
import io.ray.streaming.runtime.master.graphmanager.GraphManager;
import io.ray.streaming.runtime.master.graphmanager.GraphManagerImpl;
import io.ray.streaming.runtime.master.resourcemanager.ResourceAssignmentView;
import io.ray.streaming.runtime.master.resourcemanager.strategy.ResourceAssignStrategy;
import io.ray.streaming.runtime.master.resourcemanager.strategy.impl.PipelineFirstStrategy;
import java.util.ArrayList;
import java.util.HashMap;
@ -58,14 +57,11 @@ public class PipelineFirstStrategyTest extends BaseUnitTest {
}
@Test
public void testStrategyName() {
Assert
.assertEquals(ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName(), strategy.getName());
}
@Test
public void testAssignResource() {
public void testResourceAssignment() {
strategy = new PipelineFirstStrategy();
Assert.assertEquals(
ResourceAssignStrategyType.PIPELINE_FIRST_STRATEGY.getName(), strategy.getName());
Map<String, String> jobConf = new HashMap<>();
StreamingConfig streamingConfig = new StreamingConfig(jobConf);
GraphManager graphManager = new GraphManagerImpl(new JobRuntimeContext(streamingConfig));