Increase the number of unique bits for actors to avoid handle collisions (#12894)

This commit is contained in:
Eric Liang 2020-12-18 15:59:03 -08:00 committed by GitHub
parent 3521e74f3a
commit 3e492a79ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 54 additions and 54 deletions

View file

@ -112,20 +112,16 @@ def test_memory_table(disable_aiohttp_cache, ray_start_with_dashboard):
def check_mem_table(): def check_mem_table():
resp = requests.get(f"{webui_url}/memory/memory_table") resp = requests.get(f"{webui_url}/memory/memory_table")
resp_data = resp.json() resp_data = resp.json()
if not resp_data["result"]: assert resp_data["result"]
return False
latest_memory_table = resp_data["data"]["memoryTable"] latest_memory_table = resp_data["data"]["memoryTable"]
summary = latest_memory_table["summary"] summary = latest_memory_table["summary"]
try:
# 1 ref per handle and per object the actor has a ref to # 1 ref per handle and per object the actor has a ref to
assert summary["totalActorHandles"] == len(actors) * 2 assert summary["totalActorHandles"] == len(actors) * 2
# 1 ref for my_obj # 1 ref for my_obj
assert summary["totalLocalRefCount"] == 1 assert summary["totalLocalRefCount"] == 1
return True
except AssertionError:
return False
wait_for_condition(check_mem_table, 10) wait_until_succeeded_without_exception(
check_mem_table, (AssertionError, ), timeout_ms=1000)
def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard): def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard):

View file

@ -7,8 +7,9 @@ from ray.new_dashboard.memory_utils import (
NODE_ADDRESS = "127.0.0.1" NODE_ADDRESS = "127.0.0.1"
IS_DRIVER = True IS_DRIVER = True
PID = 1 PID = 1
OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA="
ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" OBJECT_ID = "ZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZg=="
ACTOR_ID = "fffffffffffffffffffffffffffffffff66d17ba010000c801000000"
DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) DECODED_ID = decode_object_ref_if_needed(OBJECT_ID)
OBJECT_SIZE = 100 OBJECT_SIZE = 100

View file

@ -7,7 +7,7 @@ import java.util.Random;
public class ActorId extends BaseId implements Serializable { public class ActorId extends BaseId implements Serializable {
private static final int UNIQUE_BYTES_LENGTH = 4; private static final int UNIQUE_BYTES_LENGTH = 12;
public static final int LENGTH = JobId.LENGTH + UNIQUE_BYTES_LENGTH; public static final int LENGTH = JobId.LENGTH + UNIQUE_BYTES_LENGTH;

View file

@ -10,7 +10,7 @@ import java.util.Random;
*/ */
public class ObjectId extends BaseId implements Serializable { public class ObjectId extends BaseId implements Serializable {
public static final int LENGTH = 20; public static final int LENGTH = 28;
/** /**
* Create an ObjectId from a ByteBuffer. * Create an ObjectId from a ByteBuffer.

View file

@ -11,7 +11,7 @@ import java.util.Random;
*/ */
public class UniqueId extends BaseId implements Serializable { public class UniqueId extends BaseId implements Serializable {
public static final int LENGTH = 20; public static final int LENGTH = 28;
public static final UniqueId NIL = genNil(); public static final UniqueId NIL = genNil();
/** /**

View file

@ -1,7 +1,6 @@
package io.ray.runtime; package io.ray.runtime;
import io.ray.api.id.UniqueId; import io.ray.api.id.UniqueId;
import io.ray.runtime.util.IdUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
@ -13,12 +12,12 @@ public class UniqueIdTest {
@Test @Test
public void testConstructUniqueId() { public void testConstructUniqueId() {
// Test `fromHexString()` // Test `fromHexString()`
UniqueId id1 = UniqueId.fromHexString("00000000123456789ABCDEF123456789ABCDEF00"); UniqueId id1 = UniqueId.fromHexString("00000000123456789ABCDEF123456789ABCDEF0123456789ABCDEF00");
Assert.assertEquals("00000000123456789abcdef123456789abcdef00", id1.toString()); Assert.assertEquals("00000000123456789abcdef123456789abcdef0123456789abcdef00", id1.toString());
Assert.assertFalse(id1.isNil()); Assert.assertFalse(id1.isNil());
try { try {
UniqueId id2 = UniqueId.fromHexString("000000123456789ABCDEF123456789ABCDEF00"); UniqueId id2 = UniqueId.fromHexString("000000123456789ABCDEF123456789ABCDEF0123456789ABCDEF00");
// This shouldn't be happened. // This shouldn't be happened.
Assert.assertTrue(false); Assert.assertTrue(false);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
@ -34,23 +33,16 @@ public class UniqueIdTest {
} }
// Test `fromByteBuffer()` // Test `fromByteBuffer()`
byte[] bytes = DatatypeConverter.parseHexBinary("0123456789ABCDEF0123456789ABCDEF01234567"); byte[] bytes = DatatypeConverter.parseHexBinary("0123456789ABCDEF0123456789ABCDEF012345670123456789ABCDEF");
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, 0, 20); ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, 0, 28);
UniqueId id4 = UniqueId.fromByteBuffer(byteBuffer); UniqueId id4 = UniqueId.fromByteBuffer(byteBuffer);
Assert.assertTrue(Arrays.equals(bytes, id4.getBytes())); Assert.assertTrue(Arrays.equals(bytes, id4.getBytes()));
Assert.assertEquals("0123456789abcdef0123456789abcdef01234567", id4.toString()); Assert.assertEquals("0123456789abcdef0123456789abcdef012345670123456789abcdef", id4.toString());
// Test `genNil()` // Test `genNil()`
UniqueId id6 = UniqueId.NIL; UniqueId id6 = UniqueId.NIL;
Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString()); Assert.assertEquals("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF".toLowerCase(), id6.toString());
Assert.assertTrue(id6.isNil()); Assert.assertTrue(id6.isNil());
} }
@Test
void testMurmurHash() {
UniqueId id = UniqueId.fromHexString("3131313131313131313132323232323232323232");
long remainder = Long.remainderUnsigned(IdUtil.murmurHashCode(id), 1000000000);
Assert.assertEquals(remainder, 787616861);
}
} }

View file

@ -142,7 +142,8 @@ class WorkerCrashedError(RayError):
"""Indicates that the worker died unexpectedly while executing a task.""" """Indicates that the worker died unexpectedly while executing a task."""
def __str__(self): def __str__(self):
return "The worker died unexpectedly while executing this task." return ("The worker died unexpectedly while executing this task. "
"Check python-core-worker-*.log files for more information.")
class RayActorError(RayError): class RayActorError(RayError):
@ -153,7 +154,8 @@ class RayActorError(RayError):
""" """
def __str__(self): def __str__(self):
return "The actor died unexpectedly before finishing this task." return ("The actor died unexpectedly before finishing this task. "
"Check python-core-worker-*.log files for more information.")
class RaySystemError(RayError): class RaySystemError(RayError):

View file

@ -12,6 +12,7 @@ import hashlib
import cython import cython
import inspect import inspect
import uuid import uuid
import ray.ray_constants as ray_constants
ctypedef object (*FunctionDescriptor_from_cpp)(const CFunctionDescriptor &) ctypedef object (*FunctionDescriptor_from_cpp)(const CFunctionDescriptor &)
@ -188,7 +189,8 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
function_name = function.__name__ function_name = function.__name__
class_name = "" class_name = ""
pickled_function_hash = hashlib.sha1(pickled_function).hexdigest() pickled_function_hash = hashlib.shake_128(pickled_function).hexdigest(
ray_constants.ID_SIZE)
return cls(module_name, function_name, class_name, return cls(module_name, function_name, class_name,
pickled_function_hash) pickled_function_hash)
@ -208,7 +210,10 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
module_name = target_class.__module__ module_name = target_class.__module__
class_name = target_class.__name__ class_name = target_class.__name__
# Use a random uuid as function hash to solve actor name conflict. # Use a random uuid as function hash to solve actor name conflict.
return cls(module_name, "__init__", class_name, str(uuid.uuid4())) return cls(
module_name, "__init__", class_name,
hashlib.shake_128(
uuid.uuid4().bytes).hexdigest(ray_constants.ID_SIZE))
@property @property
def module_name(self): def module_name(self):
@ -268,14 +273,14 @@ cdef class PythonFunctionDescriptor(FunctionDescriptor):
Returns: Returns:
ray.ObjectRef to represent the function descriptor. ray.ObjectRef to represent the function descriptor.
""" """
function_id_hash = hashlib.sha1() function_id_hash = hashlib.shake_128()
# Include the function module and name in the hash. # Include the function module and name in the hash.
function_id_hash.update(self.typed_descriptor.ModuleName()) function_id_hash.update(self.typed_descriptor.ModuleName())
function_id_hash.update(self.typed_descriptor.FunctionName()) function_id_hash.update(self.typed_descriptor.FunctionName())
function_id_hash.update(self.typed_descriptor.ClassName()) function_id_hash.update(self.typed_descriptor.ClassName())
function_id_hash.update(self.typed_descriptor.FunctionHash()) function_id_hash.update(self.typed_descriptor.FunctionHash())
# Compute the function ID. # Compute the function ID.
function_id = function_id_hash.digest() function_id = function_id_hash.digest(ray_constants.ID_SIZE)
return ray.FunctionID(function_id) return ray.FunctionID(function_id)
def is_actor_method(self): def is_actor_method(self):

View file

@ -31,7 +31,7 @@ def check_id(b, size=kUniqueIDSize):
raise TypeError("Unsupported type: " + str(type(b))) raise TypeError("Unsupported type: " + str(type(b)))
if len(b) != size: if len(b) != size:
raise ValueError("ID string needs to have length " + raise ValueError("ID string needs to have length " +
str(size)) str(size) + ", got " + str(len(b)))
cdef extern from "ray/common/constants.h" nogil: cdef extern from "ray/common/constants.h" nogil:

View file

@ -22,7 +22,7 @@ from ray.ray_logging import setup_component_logger
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# The groups are worker id, job id, and pid. # The groups are worker id, job id, and pid.
JOB_LOG_PATTERN = re.compile(".*worker-([0-9a-f]{40})-(\d+)-(\d+)") JOB_LOG_PATTERN = re.compile(".*worker-([0-9a-f]+)-(\d+)-(\d+)")
class LogFileInfo: class LogFileInfo:

View file

@ -19,7 +19,7 @@ def env_bool(key, default):
return default return default
ID_SIZE = 20 ID_SIZE = 28
# The default maximum number of bytes to allocate to the object store unless # The default maximum number of bytes to allocate to the object store unless
# overridden by the user. # overridden by the user.

View file

@ -74,7 +74,8 @@ def _try_to_compute_deterministic_class_id(cls, depth=5):
new_class_id = pickle.dumps(pickle.loads(class_id)) new_class_id = pickle.dumps(pickle.loads(class_id))
if new_class_id == class_id: if new_class_id == class_id:
# We appear to have reached a fix point, so use this as the ID. # We appear to have reached a fix point, so use this as the ID.
return hashlib.sha1(new_class_id).digest() return hashlib.shake_128(new_class_id).digest(
ray_constants.ID_SIZE)
class_id = new_class_id class_id = new_class_id
# We have not reached a fixed point, so we may end up with a different # We have not reached a fixed point, so we may end up with a different
@ -82,7 +83,7 @@ def _try_to_compute_deterministic_class_id(cls, depth=5):
# same class definition being exported many many times. # same class definition being exported many many times.
logger.warning( logger.warning(
f"WARNING: Could not produce a deterministic class ID for class {cls}") f"WARNING: Could not produce a deterministic class ID for class {cls}")
return hashlib.sha1(new_class_id).digest() return hashlib.shake_128(new_class_id).digest(ray_constants.ID_SIZE)
def object_ref_deserializer(reduced_obj_ref, owner_address): def object_ref_deserializer(reduced_obj_ref, owner_address):

View file

@ -284,14 +284,14 @@ def test_workers(shutdown_only):
def test_object_ref_properties(): def test_object_ref_properties():
id_bytes = b"00112233445566778899" id_bytes = b"0011223344556677889900001111"
object_ref = ray.ObjectRef(id_bytes) object_ref = ray.ObjectRef(id_bytes)
assert object_ref.binary() == id_bytes assert object_ref.binary() == id_bytes
object_ref = ray.ObjectRef.nil() object_ref = ray.ObjectRef.nil()
assert object_ref.is_nil() assert object_ref.is_nil()
with pytest.raises(ValueError, match=r".*needs to have length 20.*"): with pytest.raises(ValueError, match=r".*needs to have length.*"):
ray.ObjectRef(id_bytes + b"1234") ray.ObjectRef(id_bytes + b"1234")
with pytest.raises(ValueError, match=r".*needs to have length 20.*"): with pytest.raises(ValueError, match=r".*needs to have length.*"):
ray.ObjectRef(b"0123456789") ray.ObjectRef(b"0123456789")
object_ref = ray.ObjectRef.from_random() object_ref = ray.ObjectRef.from_random()
assert not object_ref.is_nil() assert not object_ref.is_nil()

View file

@ -741,10 +741,10 @@ ray.get(main_wait.release.remote())
driver1_out_split = driver1_out.split("\n") driver1_out_split = driver1_out.split("\n")
driver2_out_split = driver2_out.split("\n") driver2_out_split = driver2_out.split("\n")
assert driver1_out_split[0][-1] == "1" assert driver1_out_split[0][-1] == "1", driver1_out_split
assert driver1_out_split[1][-1] == "2" assert driver1_out_split[1][-1] == "2", driver1_out_split
assert driver2_out_split[0][-1] == "3" assert driver2_out_split[0][-1] == "3", driver2_out_split
assert driver2_out_split[1][-1] == "4" assert driver2_out_split[1][-1] == "4", driver2_out_split
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -50,9 +50,9 @@ def get_ray_temp_dir():
def _random_string(): def _random_string():
id_hash = hashlib.sha1() id_hash = hashlib.shake_128()
id_hash.update(uuid.uuid4().bytes) id_hash.update(uuid.uuid4().bytes)
id_bytes = id_hash.digest() id_bytes = id_hash.digest(ray_constants.ID_SIZE)
assert len(id_bytes) == ray_constants.ID_SIZE assert len(id_bytes) == ray_constants.ID_SIZE
return id_bytes return id_bytes

View file

@ -345,7 +345,8 @@ class Worker:
# actually run the function locally. # actually run the function locally.
pickled_function = pickle.dumps(function) pickled_function = pickle.dumps(function)
function_to_run_id = hashlib.sha1(pickled_function).digest() function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE)
key = b"FunctionsToRun:" + function_to_run_id key = b"FunctionsToRun:" + function_to_run_id
# First run the function on the driver. # First run the function on the driver.
# We always run the task locally. # We always run the task locally.

View file

@ -18,7 +18,7 @@
#include <stdint.h> #include <stdint.h>
/// Length of Ray full-length IDs in bytes. /// Length of Ray full-length IDs in bytes.
constexpr size_t kUniqueIDSize = 20; constexpr size_t kUniqueIDSize = 28;
/// An ObjectID's bytes are split into the task ID itself and the index of the /// An ObjectID's bytes are split into the task ID itself and the index of the
/// object's creation. This is the maximum width of the object index in bits. /// object's creation. This is the maximum width of the object index in bits.

View file

@ -124,7 +124,7 @@ class JobID : public BaseID<JobID> {
class ActorID : public BaseID<ActorID> { class ActorID : public BaseID<ActorID> {
private: private:
static constexpr size_t kUniqueBytesLength = 4; static constexpr size_t kUniqueBytesLength = 12;
public: public:
/// Length of `ActorID` in bytes. /// Length of `ActorID` in bytes.

View file

@ -91,6 +91,8 @@ bool ActorManager::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle,
std::placeholders::_1, std::placeholders::_2); std::placeholders::_1, std::placeholders::_2);
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe( RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr)); actor_id, actor_notification_callback, nullptr));
} else {
RAY_LOG(ERROR) << "Actor handle already exists " << actor_id.Hex();
} }
return inserted; return inserted;