Support to auto-generate Java files from flatbuffer (#3749)

* auto gen flatbuffers for Java

* Add auto_gen_tool.py

* Refine

* Add a comment

* address comments.

* Address comments.

* Addressed

* Refine

* Address comments

* Fix typo

* Add exception

* Address comments.

* Refine

* Fix lint

* Fix

* Fix lint and address comment.

* Fix lint error
This commit is contained in:
Wang Qing 2019-01-14 03:39:23 +08:00 committed by Philipp Moritz
parent d2cf8561f2
commit 8674606e26
17 changed files with 213 additions and 392 deletions

1
.gitignore vendored
View file

@ -11,6 +11,7 @@
/src/ray/gcs/format/*_generated.h
/src/ray/object_manager/format/*_generated.h
/src/ray/raylet/format/*_generated.h
/java/runtime/src/main/java/org/ray/runtime/generated/*
# Modin source files
/python/ray/modin

View file

@ -9,6 +9,5 @@
<suppress checks="AbbreviationAsWordInNameCheck" files="RayParameters.java"/>
<suppress checks=".*" files="RayCall.java"/>
<!-- suppress check for flatbuffer-generated files. -->
<!-- TODO(raulchen): move these files to a directory, so this rule can be simplier. -->
<suppress checks=".*" files="(Arg|ResourcePair|Language|TaskInfo|ClientTableData).java" />
<suppress checks=".*" files="org[\\/]ray[\\/]runtime[\\/]generated[\\/]" />
</suppressions>

View file

@ -0,0 +1,55 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import sys
"""
This script is used for modifying the generated java flatbuffer
files for the reason: The package declaration in Java is different
from python and C++, and there is no option in the flatc command
to specify package(namepsace) for Java specially.
USAGE:
python modify_generated_java_flatbuffers_file.py RAY_HOME
RAY_HOME: The root directory of Ray project.
"""
# constants declarations
PACKAGE_DECLARATION = "package org.ray.runtime.generated;"
def add_new_line(file, line_num, text):
with open(file, "r") as file_handler:
lines = file_handler.readlines()
if (line_num <= 0) or (line_num > len(lines) + 1):
return False
lines.insert(line_num - 1, text + os.linesep)
with open(file, "w") as file_handler:
for line in lines:
file_handler.write(line)
return True
def add_package_declarations(generated_root_path):
file_names = os.listdir(generated_root_path)
for file_name in file_names:
if not file_name.endswith(".java"):
continue
full_name = os.path.join(generated_root_path, file_name)
success = add_new_line(full_name, 2, PACKAGE_DECLARATION)
if not success:
raise RuntimeError("Failed to add package declarations, "
"file name is %s" % full_name)
if __name__ == "__main__":
ray_home = sys.argv[1]
root_path = os.path.join(
ray_home,
"java/runtime/src/main/java/org/ray/runtime/generated")
add_package_declarations(root_path)

View file

@ -1,58 +0,0 @@
// automatically generated by the FlatBuffers compiler, do not modify
package org.ray.runtime.generated;
import com.google.flatbuffers.FlatBufferBuilder;
import com.google.flatbuffers.Table;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@SuppressWarnings("unused")
public final class Arg extends Table {
public static Arg getRootAsArg(ByteBuffer _bb) { return getRootAsArg(_bb, new Arg()); }
public static Arg getRootAsArg(ByteBuffer _bb, Arg obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public Arg __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String objectIds(int j) { int o = __offset(4); return o != 0 ? __string(__vector(o) + j * 4) : null; }
public int objectIdsLength() { int o = __offset(4); return o != 0 ? __vector_len(o) : 0; }
public String data() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer dataAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public ByteBuffer dataInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); }
public static int createArg(FlatBufferBuilder builder,
int object_idsOffset,
int dataOffset) {
builder.startObject(2);
Arg.addData(builder, dataOffset);
Arg.addObjectIds(builder, object_idsOffset);
return Arg.endArg(builder);
}
public static void startArg(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addObjectIds(FlatBufferBuilder builder, int objectIdsOffset) { builder.addOffset(0, objectIdsOffset, 0); }
public static int createObjectIdsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startObjectIdsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addData(FlatBufferBuilder builder, int dataOffset) { builder.addOffset(1, dataOffset, 0); }
public static int endArg(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
//this is manually added to avoid encoding/decoding cost as our object id is a byte array
// instead of a string
public ByteBuffer objectIdAsByteBuffer(int j) {
int o = __offset(4);
if (o == 0) {
return null;
}
int offset = __vector(o) + j * 4;
offset += bb.getInt(offset);
ByteBuffer src = bb.duplicate().order(ByteOrder.LITTLE_ENDIAN);
int length = src.getInt(offset);
src.position(offset + 4);
src.limit(offset + 4 + length);
return src;
}
}

View file

@ -1,79 +0,0 @@
package org.ray.runtime.generated;
// automatically generated by the FlatBuffers compiler, do not modify
import java.nio.*;
import java.lang.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class ClientTableData extends Table {
public static ClientTableData getRootAsClientTableData(ByteBuffer _bb) { return getRootAsClientTableData(_bb, new ClientTableData()); }
public static ClientTableData getRootAsClientTableData(ByteBuffer _bb, ClientTableData obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public ClientTableData __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String clientId() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer clientIdAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public ByteBuffer clientIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); }
public String nodeManagerAddress() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer nodeManagerAddressAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public ByteBuffer nodeManagerAddressInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); }
public String rayletSocketName() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer rayletSocketNameAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
public ByteBuffer rayletSocketNameInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); }
public String objectStoreSocketName() { int o = __offset(10); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer objectStoreSocketNameAsByteBuffer() { return __vector_as_bytebuffer(10, 1); }
public ByteBuffer objectStoreSocketNameInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 10, 1); }
public int nodeManagerPort() { int o = __offset(12); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public int objectManagerPort() { int o = __offset(14); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public boolean isInsertion() { int o = __offset(16); return o != 0 ? 0!=bb.get(o + bb_pos) : false; }
public String resourcesTotalLabel(int j) { int o = __offset(18); return o != 0 ? __string(__vector(o) + j * 4) : null; }
public int resourcesTotalLabelLength() { int o = __offset(18); return o != 0 ? __vector_len(o) : 0; }
public double resourcesTotalCapacity(int j) { int o = __offset(20); return o != 0 ? bb.getDouble(__vector(o) + j * 8) : 0; }
public int resourcesTotalCapacityLength() { int o = __offset(20); return o != 0 ? __vector_len(o) : 0; }
public ByteBuffer resourcesTotalCapacityAsByteBuffer() { return __vector_as_bytebuffer(20, 8); }
public ByteBuffer resourcesTotalCapacityInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 20, 8); }
public static int createClientTableData(FlatBufferBuilder builder,
int client_idOffset,
int node_manager_addressOffset,
int raylet_socket_nameOffset,
int object_store_socket_nameOffset,
int node_manager_port,
int object_manager_port,
boolean is_insertion,
int resources_total_labelOffset,
int resources_total_capacityOffset) {
builder.startObject(9);
ClientTableData.addResourcesTotalCapacity(builder, resources_total_capacityOffset);
ClientTableData.addResourcesTotalLabel(builder, resources_total_labelOffset);
ClientTableData.addObjectManagerPort(builder, object_manager_port);
ClientTableData.addNodeManagerPort(builder, node_manager_port);
ClientTableData.addObjectStoreSocketName(builder, object_store_socket_nameOffset);
ClientTableData.addRayletSocketName(builder, raylet_socket_nameOffset);
ClientTableData.addNodeManagerAddress(builder, node_manager_addressOffset);
ClientTableData.addClientId(builder, client_idOffset);
ClientTableData.addIsInsertion(builder, is_insertion);
return ClientTableData.endClientTableData(builder);
}
public static void startClientTableData(FlatBufferBuilder builder) { builder.startObject(9); }
public static void addClientId(FlatBufferBuilder builder, int clientIdOffset) { builder.addOffset(0, clientIdOffset, 0); }
public static void addNodeManagerAddress(FlatBufferBuilder builder, int nodeManagerAddressOffset) { builder.addOffset(1, nodeManagerAddressOffset, 0); }
public static void addRayletSocketName(FlatBufferBuilder builder, int rayletSocketNameOffset) { builder.addOffset(2, rayletSocketNameOffset, 0); }
public static void addObjectStoreSocketName(FlatBufferBuilder builder, int objectStoreSocketNameOffset) { builder.addOffset(3, objectStoreSocketNameOffset, 0); }
public static void addNodeManagerPort(FlatBufferBuilder builder, int nodeManagerPort) { builder.addInt(4, nodeManagerPort, 0); }
public static void addObjectManagerPort(FlatBufferBuilder builder, int objectManagerPort) { builder.addInt(5, objectManagerPort, 0); }
public static void addIsInsertion(FlatBufferBuilder builder, boolean isInsertion) { builder.addBoolean(6, isInsertion, false); }
public static void addResourcesTotalLabel(FlatBufferBuilder builder, int resourcesTotalLabelOffset) { builder.addOffset(7, resourcesTotalLabelOffset, 0); }
public static int createResourcesTotalLabelVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startResourcesTotalLabelVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addResourcesTotalCapacity(FlatBufferBuilder builder, int resourcesTotalCapacityOffset) { builder.addOffset(8, resourcesTotalCapacityOffset, 0); }
public static int createResourcesTotalCapacityVector(FlatBufferBuilder builder, double[] data) { builder.startVector(8, data.length, 8); for (int i = data.length - 1; i >= 0; i--) builder.addDouble(data[i]); return builder.endVector(); }
public static void startResourcesTotalCapacityVector(FlatBufferBuilder builder, int numElems) { builder.startVector(8, numElems, 8); }
public static int endClientTableData(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}

View file

@ -1,14 +0,0 @@
// automatically generated by the FlatBuffers compiler, do not modify
package org.ray.runtime.generated;
public final class Language {
private Language() { }
public static final int PYTHON = 0;
public static final int CPP = 1;
public static final int JAVA = 2;
public static final String[] names = { "PYTHON", "CPP", "JAVA", };
public static String name(int e) { return names[e]; }
}

View file

@ -1,38 +0,0 @@
// automatically generated by the FlatBuffers compiler, do not modify
package org.ray.runtime.generated;
import java.nio.*;
import java.lang.*;
import com.google.flatbuffers.*;
@SuppressWarnings("unused")
public final class ResourcePair extends Table {
public static ResourcePair getRootAsResourcePair(ByteBuffer _bb) { return getRootAsResourcePair(_bb, new ResourcePair()); }
public static ResourcePair getRootAsResourcePair(ByteBuffer _bb, ResourcePair obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public ResourcePair __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String key() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer keyAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public ByteBuffer keyInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); }
public double value() { int o = __offset(6); return o != 0 ? bb.getDouble(o + bb_pos) : 0.0; }
public static int createResourcePair(FlatBufferBuilder builder,
int keyOffset,
double value) {
builder.startObject(2);
ResourcePair.addValue(builder, value);
ResourcePair.addKey(builder, keyOffset);
return ResourcePair.endResourcePair(builder);
}
public static void startResourcePair(FlatBufferBuilder builder) { builder.startObject(2); }
public static void addKey(FlatBufferBuilder builder, int keyOffset) { builder.addOffset(0, keyOffset, 0); }
public static void addValue(FlatBufferBuilder builder, double value) { builder.addDouble(1, value, 0.0); }
public static int endResourcePair(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
}

View file

@ -1,155 +0,0 @@
// automatically generated by the FlatBuffers compiler, do not modify
package org.ray.runtime.generated;
import com.google.flatbuffers.FlatBufferBuilder;
import com.google.flatbuffers.Table;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@SuppressWarnings("unused")
public final class TaskInfo extends Table {
public static TaskInfo getRootAsTaskInfo(ByteBuffer _bb) { return getRootAsTaskInfo(_bb, new TaskInfo()); }
public static TaskInfo getRootAsTaskInfo(ByteBuffer _bb, TaskInfo obj) { _bb.order(ByteOrder.LITTLE_ENDIAN); return (obj.__assign(_bb.getInt(_bb.position()) + _bb.position(), _bb)); }
public void __init(int _i, ByteBuffer _bb) { bb_pos = _i; bb = _bb; }
public TaskInfo __assign(int _i, ByteBuffer _bb) { __init(_i, _bb); return this; }
public String driverId() { int o = __offset(4); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer driverIdAsByteBuffer() { return __vector_as_bytebuffer(4, 1); }
public ByteBuffer driverIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 4, 1); }
public String taskId() { int o = __offset(6); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer taskIdAsByteBuffer() { return __vector_as_bytebuffer(6, 1); }
public ByteBuffer taskIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 6, 1); }
public String parentTaskId() { int o = __offset(8); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer parentTaskIdAsByteBuffer() { return __vector_as_bytebuffer(8, 1); }
public ByteBuffer parentTaskIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 8, 1); }
public int parentCounter() { int o = __offset(10); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public String actorCreationId() { int o = __offset(12); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer actorCreationIdAsByteBuffer() { return __vector_as_bytebuffer(12, 1); }
public ByteBuffer actorCreationIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 12, 1); }
public String actorCreationDummyObjectId() { int o = __offset(14); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer actorCreationDummyObjectIdAsByteBuffer() { return __vector_as_bytebuffer(14, 1); }
public ByteBuffer actorCreationDummyObjectIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 14, 1); }
public int maxActorReconstructions() { int o = __offset(16); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public String actorId() { int o = __offset(18); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer actorIdAsByteBuffer() { return __vector_as_bytebuffer(18, 1); }
public ByteBuffer actorIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 18, 1); }
public String actorHandleId() { int o = __offset(20); return o != 0 ? __string(o + bb_pos) : null; }
public ByteBuffer actorHandleIdAsByteBuffer() { return __vector_as_bytebuffer(20, 1); }
public ByteBuffer actorHandleIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 20, 1); }
public int actorCounter() { int o = __offset(22); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public boolean isActorCheckpointMethod() { int o = __offset(24); return o != 0 ? 0!=bb.get(o + bb_pos) : false; }
public String newActorHandles(int j) { int o = __offset(26); return o != 0 ? __string(__vector(o) + j * 4) : null; }
public int newActorHandlesLength() { int o = __offset(26); return o != 0 ? __vector_len(o) : 0; }
public Arg args(int j) { return args(new Arg(), j); }
public Arg args(Arg obj, int j) { int o = __offset(28); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int argsLength() { int o = __offset(28); return o != 0 ? __vector_len(o) : 0; }
public String returns(int j) { int o = __offset(30); return o != 0 ? __string(__vector(o) + j * 4) : null; }
public int returnsLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; }
public ResourcePair requiredResources(int j) { return requiredResources(new ResourcePair(), j); }
public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int requiredResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; }
public ResourcePair requiredPlacementResources(int j) { return requiredPlacementResources(new ResourcePair(), j); }
public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(34); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; }
public int requiredPlacementResourcesLength() { int o = __offset(34); return o != 0 ? __vector_len(o) : 0; }
public int language() { int o = __offset(36); return o != 0 ? bb.getInt(o + bb_pos) : 0; }
public String functionDescriptor(int j) { int o = __offset(38); return o != 0 ? __string(__vector(o) + j * 4) : null; }
public int functionDescriptorLength() { int o = __offset(38); return o != 0 ? __vector_len(o) : 0; }
public static int createTaskInfo(FlatBufferBuilder builder,
int driver_idOffset,
int task_idOffset,
int parent_task_idOffset,
int parent_counter,
int actor_creation_idOffset,
int actor_creation_dummy_object_idOffset,
int max_actor_reconstructions,
int actor_idOffset,
int actor_handle_idOffset,
int actor_counter,
boolean is_actor_checkpoint_method,
int new_actor_handlesOffset,
int argsOffset,
int returnsOffset,
int required_resourcesOffset,
int required_placement_resourcesOffset,
int language,
int function_descriptorOffset) {
builder.startObject(18);
TaskInfo.addFunctionDescriptor(builder, function_descriptorOffset);
TaskInfo.addLanguage(builder, language);
TaskInfo.addRequiredPlacementResources(builder, required_placement_resourcesOffset);
TaskInfo.addRequiredResources(builder, required_resourcesOffset);
TaskInfo.addReturns(builder, returnsOffset);
TaskInfo.addArgs(builder, argsOffset);
TaskInfo.addNewActorHandles(builder, new_actor_handlesOffset);
TaskInfo.addActorCounter(builder, actor_counter);
TaskInfo.addActorHandleId(builder, actor_handle_idOffset);
TaskInfo.addActorId(builder, actor_idOffset);
TaskInfo.addMaxActorReconstructions(builder, max_actor_reconstructions);
TaskInfo.addActorCreationDummyObjectId(builder, actor_creation_dummy_object_idOffset);
TaskInfo.addActorCreationId(builder, actor_creation_idOffset);
TaskInfo.addParentCounter(builder, parent_counter);
TaskInfo.addParentTaskId(builder, parent_task_idOffset);
TaskInfo.addTaskId(builder, task_idOffset);
TaskInfo.addDriverId(builder, driver_idOffset);
TaskInfo.addIsActorCheckpointMethod(builder, is_actor_checkpoint_method);
return TaskInfo.endTaskInfo(builder);
}
public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(18); }
public static void addDriverId(FlatBufferBuilder builder, int driverIdOffset) { builder.addOffset(0, driverIdOffset, 0); }
public static void addTaskId(FlatBufferBuilder builder, int taskIdOffset) { builder.addOffset(1, taskIdOffset, 0); }
public static void addParentTaskId(FlatBufferBuilder builder, int parentTaskIdOffset) { builder.addOffset(2, parentTaskIdOffset, 0); }
public static void addParentCounter(FlatBufferBuilder builder, int parentCounter) { builder.addInt(3, parentCounter, 0); }
public static void addActorCreationId(FlatBufferBuilder builder, int actorCreationIdOffset) { builder.addOffset(4, actorCreationIdOffset, 0); }
public static void addActorCreationDummyObjectId(FlatBufferBuilder builder, int actorCreationDummyObjectIdOffset) { builder.addOffset(5, actorCreationDummyObjectIdOffset, 0); }
public static void addMaxActorReconstructions(FlatBufferBuilder builder, int maxActorReconstructions) { builder.addInt(6, maxActorReconstructions, 0); }
public static void addActorId(FlatBufferBuilder builder, int actorIdOffset) { builder.addOffset(7, actorIdOffset, 0); }
public static void addActorHandleId(FlatBufferBuilder builder, int actorHandleIdOffset) { builder.addOffset(8, actorHandleIdOffset, 0); }
public static void addActorCounter(FlatBufferBuilder builder, int actorCounter) { builder.addInt(9, actorCounter, 0); }
public static void addIsActorCheckpointMethod(FlatBufferBuilder builder, boolean isActorCheckpointMethod) { builder.addBoolean(10, isActorCheckpointMethod, false); }
public static void addNewActorHandles(FlatBufferBuilder builder, int newActorHandlesOffset) { builder.addOffset(11, newActorHandlesOffset, 0); }
public static int createNewActorHandlesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startNewActorHandlesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addArgs(FlatBufferBuilder builder, int argsOffset) { builder.addOffset(12, argsOffset, 0); }
public static int createArgsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startArgsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addReturns(FlatBufferBuilder builder, int returnsOffset) { builder.addOffset(13, returnsOffset, 0); }
public static int createReturnsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startReturnsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(14, requiredResourcesOffset, 0); }
public static int createRequiredResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startRequiredResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(15, requiredPlacementResourcesOffset, 0); }
public static int createRequiredPlacementResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startRequiredPlacementResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(16, language, 0); }
public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(17, functionDescriptorOffset, 0); }
public static int createFunctionDescriptorVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); }
public static void startFunctionDescriptorVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); }
public static int endTaskInfo(FlatBufferBuilder builder) {
int o = builder.endObject();
return o;
}
/** This is manually added to avoid encoding/decoding cost as our object
* id is a byte array instead of a string.
* This function is error-prone. If the fields before `returns` changed,
* the offset number should be changed accordingly.
* TODO(yuhguo): fix this error-prone funciton.
*/
public ByteBuffer returnsAsByteBuffer(int j) {
int o = __offset(30);
if (o == 0) {
return null;
}
int offset = __vector(o) + j * 4;
offset += bb.getInt(offset);
ByteBuffer src = bb.duplicate().order(ByteOrder.LITTLE_ENDIAN);
int length = src.getInt(offset);
src.position(offset + 4);
src.limit(offset + 4 + length);
return src;
}
}

View file

@ -146,10 +146,12 @@ public class RayletClientImpl implements RayletClient {
FunctionArg[] args = new FunctionArg[info.argsLength()];
for (int i = 0; i < info.argsLength(); i++) {
Arg arg = info.args(i);
if (arg.objectIdsLength() > 0) {
Preconditions.checkArgument(arg.objectIdsLength() == 1,
"This arg has more than one id: {}", arg.objectIdsLength());
UniqueId id = UniqueId.fromByteBuffer(arg.objectIdAsByteBuffer(0));
int objectIdsLength = arg.objectIdsAsByteBuffer().remaining() / UniqueId.LENGTH;
if (objectIdsLength > 0) {
Preconditions.checkArgument(objectIdsLength == 1,
"This arg has more than one id: {}", objectIdsLength);
UniqueId id = UniqueIdUtil.getUniqueIdsFromByteBuffer(arg.objectIdsAsByteBuffer())[0];
args[i] = FunctionArg.passByReference(id);
} else {
ByteBuffer lbb = arg.dataAsByteBuffer();
@ -160,10 +162,8 @@ public class RayletClientImpl implements RayletClient {
}
}
// Deserialize return ids
UniqueId[] returnIds = new UniqueId[info.returnsLength()];
for (int i = 0; i < info.returnsLength(); i++) {
returnIds[i] = UniqueId.fromByteBuffer(info.returnsAsByteBuffer(i));
}
UniqueId[] returnIds = UniqueIdUtil.getUniqueIdsFromByteBuffer(info.returnsAsByteBuffer());
// Deserialize required resources;
Map<String, Double> resources = new HashMap<>();
for (int i = 0; i < info.requiredResourcesLength(); i++) {
@ -175,8 +175,8 @@ public class RayletClientImpl implements RayletClient {
info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2)
);
return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
maxActorReconstructions, actorId, actorHandleId, actorCounter, args, returnIds, resources,
functionDescriptor);
maxActorReconstructions, actorId, actorHandleId, actorCounter,
args, returnIds, resources, functionDescriptor);
}
private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
@ -194,22 +194,21 @@ public class RayletClientImpl implements RayletClient {
final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer());
final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer());
final int actorCounter = task.actorCounter;
// Serialize the new actor handles.
int[] newActorHandlesOffsets = new int[task.newActorHandles.length];
for (int i = 0; i < newActorHandlesOffsets.length; i++) {
newActorHandlesOffsets[i] = fbb.createString(task.newActorHandles[i].toByteBuffer());
}
int newActorHandlesOffset = fbb.createVectorOfTables(newActorHandlesOffsets);
int newActorHandlesOffset
= fbb.createString(UniqueIdUtil.concatUniqueIds(task.newActorHandles));
// Serialize args
int[] argsOffsets = new int[task.args.length];
for (int i = 0; i < argsOffsets.length; i++) {
int objectIdOffset = 0;
int dataOffset = 0;
if (task.args[i].id != null) {
int[] idOffsets = new int[]{fbb.createString(task.args[i].id.toByteBuffer())};
objectIdOffset = fbb.createVectorOfTables(idOffsets);
objectIdOffset = fbb.createString(
UniqueIdUtil.concatUniqueIds(new UniqueId[] {task.args[i].id}));
} else {
objectIdOffset = fbb.createVectorOfTables(new int[0]);
objectIdOffset = fbb.createString("");
}
if (task.args[i].data != null) {
dataOffset = fbb.createString(ByteBuffer.wrap(task.args[i].data));
@ -217,13 +216,10 @@ public class RayletClientImpl implements RayletClient {
argsOffsets[i] = Arg.createArg(fbb, objectIdOffset, dataOffset);
}
int argsOffset = fbb.createVectorOfTables(argsOffsets);
// Serialize returns
int returnCount = task.returnIds.length;
int[] returnsOffsets = new int[returnCount];
for (int k = 0; k < returnCount; k++) {
returnsOffsets[k] = fbb.createString(task.returnIds[k].toByteBuffer());
}
int returnsOffset = fbb.createVectorOfTables(returnsOffsets);
int returnsOffset = fbb.createString(UniqueIdUtil.concatUniqueIds(task.returnIds));
// Serialize required resources
// The required_resources vector indicates the quantities of the different
// resources required by this task. The index in this vector corresponds to

View file

@ -4,7 +4,6 @@ import org.ray.api.id.UniqueId;
/**
* Represents a function argument in task spec.
*
* Either `id` or `data` should be null, when id is not null, this argument will be
* passed by reference, otherwise it will be passed by value.
*/

View file

@ -1,10 +1,10 @@
package org.ray.runtime.util;
import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import org.ray.api.id.UniqueId;
@ -94,4 +94,44 @@ public class UniqueIdUtil {
}
return ids;
}
/**
* Get unique IDs from concatenated ByteBuffer.
*
* @param byteBufferOfIds The ByteBuffer concatenated from IDs.
* @return The array of unique IDs.
*/
public static UniqueId[] getUniqueIdsFromByteBuffer(ByteBuffer byteBufferOfIds) {
Preconditions.checkArgument(byteBufferOfIds != null);
byte[] bytesOfIds = new byte[byteBufferOfIds.remaining()];
byteBufferOfIds.get(bytesOfIds, 0, byteBufferOfIds.remaining());
int count = bytesOfIds.length / UniqueId.LENGTH;
UniqueId[] uniqueIds = new UniqueId[count];
for (int i = 0; i < count; ++i) {
byte[] id = new byte[UniqueId.LENGTH];
System.arraycopy(bytesOfIds, i * UniqueId.LENGTH, id, 0, UniqueId.LENGTH);
uniqueIds[i] = UniqueId.fromByteBuffer(ByteBuffer.wrap(id));
}
return uniqueIds;
}
/**
* Concatenate IDs to a ByteBuffer.
*
* @param ids The array of IDs that will be concatenated.
* @return A ByteBuffer that contains bytes of concatenated IDs.
*/
public static ByteBuffer concatUniqueIds(UniqueId[] ids) {
byte[] bytesOfIds = new byte[UniqueId.LENGTH * ids.length];
for (int i = 0; i < ids.length; ++i) {
System.arraycopy(ids[i].getBytes(), 0, bytesOfIds,
i * UniqueId.LENGTH, UniqueId.LENGTH);
}
return ByteBuffer.wrap(bytesOfIds);
}
}

View file

@ -2,6 +2,7 @@ package org.ray.api.test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import javax.xml.bind.DatatypeConverter;
import org.junit.Assert;
import org.junit.Test;
@ -79,4 +80,20 @@ public class UniqueIdTest {
Assert.assertEquals("FCFCFDFE123456789ABCDEF123456789ABCDEF00".toLowerCase(), putId.toString());
}
@Test
public void testUniqueIdsAndByteBufferInterConversion() {
final int len = 5;
UniqueId[] ids = new UniqueId[len];
for (int i = 0; i < len; ++i) {
ids[i] = UniqueId.randomId();
}
ByteBuffer temp = UniqueIdUtil.concatUniqueIds(ids);
UniqueId[] res = UniqueIdUtil.getUniqueIdsFromByteBuffer(temp);
for (int i = 0; i < len; ++i) {
Assert.assertEquals(ids[i], res[i]);
}
}
}

View file

@ -24,6 +24,32 @@ const std::vector<ray::ObjectID> from_flatbuf(
return object_ids;
}
const std::vector<ray::ObjectID> object_ids_from_flatbuf(
const flatbuffers::String &string) {
const auto &object_ids = string_from_flatbuf(string);
std::vector<ray::ObjectID> ret;
RAY_CHECK(object_ids.size() % kUniqueIDSize == 0);
auto count = object_ids.size() / kUniqueIDSize;
for (size_t i = 0; i < count; ++i) {
auto pos = static_cast<size_t>(kUniqueIDSize * i);
const auto &id = object_ids.substr(pos, kUniqueIDSize);
ret.push_back(ray::ObjectID::from_binary(id));
}
return ret;
}
flatbuffers::Offset<flatbuffers::String> object_ids_to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb, const std::vector<ray::ObjectID> &object_ids) {
std::string result;
for (const auto &id : object_ids) {
result += id.binary();
}
return fbb.CreateString(result);
}
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
to_flatbuf(flatbuffers::FlatBufferBuilder &fbb, ray::ObjectID object_ids[],
int64_t num_objects) {

View file

@ -28,6 +28,23 @@ ray::ObjectID from_flatbuf(const flatbuffers::String &string);
const std::vector<ray::ObjectID> from_flatbuf(
const flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>> &vector);
/// Convert a flatbuffer of string that concatenated
/// object IDs to a vector of object IDs.
///
/// @param vector The flatbuffer vector.
/// @return The vector of object IDs.
const std::vector<ray::ObjectID> object_ids_from_flatbuf(
const flatbuffers::String &string);
/// Convert a vector of object IDs to a flatbuffer string.
/// The IDs are concatenated to a string with binary.
///
/// @param fbb Reference to the flatbuffer builder.
/// @param object_ids The vector of object IDs.
/// @return Flatbuffer string of concatenated IDs.
flatbuffers::Offset<flatbuffers::String> object_ids_to_flatbuf(
flatbuffers::FlatBufferBuilder &fbb, const std::vector<ray::ObjectID> &object_ids);
/// Convert an array of object IDs to a flatbuffer vector of strings.
///
/// @param fbb Reference to the flatbuffer builder.

View file

@ -23,14 +23,25 @@ add_custom_command(
VERBATIM)
add_custom_target(gen_gcs_fbs DEPENDS ${GCS_FBS_OUTPUT_FILES})
set(RAY_HOME ${CMAKE_CURRENT_LIST_DIR}/../../..)
# Generate Python bindings for the flatbuffers objects.
set(PYTHON_OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/../../../python/ray/core/generated/)
set(PYTHON_OUTPUT_DIR ${RAY_HOME}/python/ray/core/generated/)
add_custom_command(
TARGET gen_gcs_fbs
COMMAND ${FLATBUFFERS_COMPILER} -p -o ${PYTHON_OUTPUT_DIR} ${GCS_FBS_SRC}
DEPENDS ${FBS_DEPENDS}
COMMENT "Running flatc compiler on ${GCS_FBS_SRC}"
COMMENT "Running flatc compiler on ${GCS_FBS_SRC} for python"
VERBATIM)
# Generate Java bindings for the flatbuffers objects.
set(JAVA_OUTPUT_DIR ${RAY_HOME}/java/runtime/src/main/java/org/ray/runtime/generated/)
add_custom_command(
TARGET gen_gcs_fbs
COMMAND ${FLATBUFFERS_COMPILER} -j -o ${JAVA_OUTPUT_DIR} ${GCS_FBS_SRC}
COMMAND ${PYTHON_EXECUTABLE} ${RAY_HOME}/java/modify_generated_java_flatbuffers_files.py ${RAY_HOME}
DEPENDS ${FBS_DEPENDS}
COMMENT "Running flatc compiler on ${GCS_FBS_SRC} for Java"
VERBATIM)
ADD_RAY_TEST(client_test STATIC_LINK_LIBS ray_static ${PLASMA_STATIC_LIB} ${ARROW_STATIC_LIB} gtest gtest_main pthread ${Boost_SYSTEM_LIBRARY})

View file

@ -42,7 +42,8 @@ table Arg {
// object ID in this list which represents the object that is being passed.
// However to support reducers in a MapReduce workload, we also support
// passing multiple object IDs for each argument.
object_ids: [string];
// Note that this is a long string that concatenate all of the object IDs.
object_ids: string;
// Data for pass-by-value arguments.
data: string;
}
@ -76,11 +77,13 @@ table TaskInfo {
// If this is an actor task, then this will be populated with all of the new
// actor handles that were forked from this handle since the last task on
// this handle was submitted.
new_actor_handles: [string];
// Note that this is a long string that concatenate all of the new_actor_handle IDs.
new_actor_handles: string;
// Task arguments.
args: [Arg];
// Object IDs of return values.
returns: [string];
// Object IDs of return values. This is a long string that concatenate
// all of the return object IDs of this task.
returns: string;
// The required_resources vector indicates the quantities of the different
// resources required by this task.
required_resources: [ResourcePair];

View file

@ -17,7 +17,7 @@ TaskArgumentByReference::TaskArgumentByReference(const std::vector<ObjectID> &re
flatbuffers::Offset<Arg> TaskArgumentByReference::ToFlatbuffer(
flatbuffers::FlatBufferBuilder &fbb) const {
return CreateArg(fbb, to_flatbuf(fbb, references_));
return CreateArg(fbb, object_ids_to_flatbuf(fbb, references_));
}
TaskArgumentByValue::TaskArgumentByValue(const uint8_t *value, size_t length) {
@ -28,7 +28,7 @@ flatbuffers::Offset<Arg> TaskArgumentByValue::ToFlatbuffer(
flatbuffers::FlatBufferBuilder &fbb) const {
auto arg =
fbb.CreateString(reinterpret_cast<const char *>(value_.data()), value_.size());
auto empty_ids = fbb.CreateVectorOfStrings({});
const auto &empty_ids = fbb.CreateString("");
return CreateArg(fbb, empty_ids, arg);
}
@ -88,11 +88,10 @@ TaskSpecification::TaskSpecification(
arguments.push_back(argument->ToFlatbuffer(fbb));
}
// Add return object IDs.
std::vector<flatbuffers::Offset<flatbuffers::String>> returns;
for (int64_t i = 1; i < num_returns + 1; i++) {
ObjectID return_id = ComputeReturnId(task_id, i);
returns.push_back(to_flatbuf(fbb, return_id));
// Generate return ids.
std::vector<ray::ObjectID> returns;
for (int64_t i = 1; i < num_returns + 1; ++i) {
returns.push_back(ComputeReturnId(task_id, i));
}
// Serialize the TaskSpecification.
@ -101,8 +100,8 @@ TaskSpecification::TaskSpecification(
to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id),
to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions,
to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter, false,
to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments),
fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources),
object_ids_to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments),
object_ids_to_flatbuf(fbb, returns), map_to_flatbuf(fbb, required_resources),
map_to_flatbuf(fbb, required_placement_resources), language,
string_vec_to_flatbuf(fbb, function_descriptor));
fbb.Finish(spec);
@ -164,12 +163,12 @@ int64_t TaskSpecification::NumArgs() const {
int64_t TaskSpecification::NumReturns() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return message->returns()->size();
return (message->returns()->size() / kUniqueIDSize);
}
ObjectID TaskSpecification::ReturnId(int64_t return_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return from_flatbuf(*message->returns()->Get(return_index));
return object_ids_from_flatbuf(*message->returns())[return_index];
}
bool TaskSpecification::ArgByRef(int64_t arg_index) const {
@ -179,12 +178,14 @@ bool TaskSpecification::ArgByRef(int64_t arg_index) const {
int TaskSpecification::ArgIdCount(int64_t arg_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
auto ids = message->args()->Get(arg_index)->object_ids();
return ids->size();
return (ids->size() / kUniqueIDSize);
}
ObjectID TaskSpecification::ArgId(int64_t arg_index, int64_t id_index) const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return from_flatbuf(*message->args()->Get(arg_index)->object_ids()->Get(id_index));
const auto &object_ids =
object_ids_from_flatbuf(*message->args()->Get(arg_index)->object_ids());
return object_ids[id_index];
}
const uint8_t *TaskSpecification::ArgVal(int64_t arg_index) const {
@ -266,7 +267,7 @@ ObjectID TaskSpecification::ActorDummyObject() const {
std::vector<ActorHandleID> TaskSpecification::NewActorHandles() const {
auto message = flatbuffers::GetRoot<TaskInfo>(spec_.data());
return from_flatbuf(*message->new_actor_handles());
return object_ids_from_flatbuf(*message->new_actor_handles());
}
} // namespace raylet