Enable memstore by default (#6003)

This commit is contained in:
Eric Liang 2019-10-25 21:59:12 -07:00 committed by GitHub
parent f1d2eb5247
commit a5523466a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 23 additions and 51 deletions

View file

@ -657,7 +657,7 @@ cdef class CoreWorker:
raylet_socket.encode("ascii"), job_id.native(),
gcs_options.native()[0], log_dir.encode("utf-8"),
node_ip_address.encode("utf-8"), task_execution_handler,
check_signals, False))
check_signals))
def disconnect(self):
with nogil:

View file

@ -84,8 +84,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
const c_vector[CObjectID] &arg_reference_ids,
const c_vector[CObjectID] &return_ids,
c_vector[shared_ptr[CRayObject]] *returns) nogil,
CRayStatus() nogil,
c_bool use_memory_store_)
CRayStatus() nogil)
void Disconnect()
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()

View file

@ -4,9 +4,6 @@ from libc.stdint cimport uint8_t, uint32_t, int64_t
cdef extern from "ray/common/id.h" namespace "ray" nogil:
cdef cppclass CBaseID[T]:
@staticmethod
T from_random()
@staticmethod
T FromBinary(const c_string &binary)
@ -32,7 +29,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
size_t Size()
@staticmethod
CUniqueID from_random()
CUniqueID FromRandom()
@staticmethod
CUniqueID FromBinary(const c_string &binary)
@ -133,6 +130,9 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
@staticmethod
CObjectID FromBinary(const c_string &binary)
@staticmethod
CObjectID FromRandom()
@staticmethod
const CObjectID Nil()

View file

@ -110,7 +110,7 @@ cdef class UniqueID(BaseID):
@classmethod
def from_random(cls):
return cls(os.urandom(CUniqueID.Size()))
return cls(CUniqueID.FromRandom().Binary())
def size(self):
return CUniqueID.Size()
@ -197,7 +197,7 @@ cdef class ObjectID(BaseID):
@classmethod
def from_random(cls):
return cls(os.urandom(CObjectID.Size()))
return cls(CObjectID.FromRandom().Binary())
cdef class TaskID(BaseID):

View file

@ -9,7 +9,6 @@ import glob
import io
import json
import logging
from multiprocessing import Process
import os
import random
import re
@ -2998,27 +2997,6 @@ def test_object_id_properties():
id_dumps = pickle.dumps(object_id)
id_from_dumps = pickle.loads(id_dumps)
assert id_from_dumps == object_id
file_prefix = "test_object_id_properties"
# Make sure the ids are fork safe.
def write(index):
str = ray.ObjectID.from_random().hex()
with open("{}{}".format(file_prefix, index), "w") as fo:
fo.write(str)
def read(index):
with open("{}{}".format(file_prefix, index), "r") as fi:
for line in fi:
return line
processes = [Process(target=write, args=(_, )) for _ in range(4)]
for process in processes:
process.start()
for process in processes:
process.join()
hexes = {read(i) for i in range(4)}
[os.remove("{}{}".format(file_prefix, i)) for i in range(4)]
assert len(hexes) == 4
@pytest.fixture

View file

@ -11,7 +11,6 @@ import inspect
import io
import json
import logging
import numpy as np
import os
import redis
import signal
@ -1491,7 +1490,7 @@ def connect(node,
# Put something in the plasma store so that subsequent plasma store
# accesses will be faster. Currently the first access is always slow, and
# we don't want the user to experience this.
temporary_object_id = ray.ObjectID(np.random.bytes(20))
temporary_object_id = ray.ObjectID.from_random()
worker.put_object(1, object_id=temporary_object_id)
ray.internal.free([temporary_object_id])

View file

@ -56,6 +56,7 @@ template <typename T>
class BaseID {
public:
BaseID();
// Warning: this can duplicate IDs after a fork() call. We assume this never happens.
static T FromRandom();
static T FromBinary(const std::string &binary);
static const T &Nil();
@ -102,6 +103,7 @@ class JobID : public BaseID<JobID> {
static size_t Size() { return kLength; }
// Warning: this can duplicate IDs after a fork() call. We assume this never happens.
static JobID FromRandom() = delete;
JobID() : BaseID() {}
@ -140,6 +142,7 @@ class ActorID : public BaseID<ActorID> {
/// \return The `ActorID` with unique bytes being nil.
static ActorID NilFromJob(const JobID &job_id);
// Warning: this can duplicate IDs after a fork() call. We assume this never happens.
static ActorID FromRandom() = delete;
/// Constructor of `ActorID`.
@ -167,6 +170,7 @@ class TaskID : public BaseID<TaskID> {
static TaskID ComputeDriverTaskId(const WorkerID &driver_id);
// Warning: this can duplicate IDs after a fork() call. We assume this never happens.
static TaskID FromRandom() = delete;
/// The ID generated for driver task.
@ -310,6 +314,9 @@ class ObjectID : public BaseID<ObjectID> {
/// Create an object id randomly.
///
/// Warning: this can duplicate IDs after a fork() call. We assume this
/// never happens.
///
/// \param transport_type Which type of the transport that is used to
/// transfer this object.
///

View file

@ -46,7 +46,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals, bool use_memory_store)
std::function<Status()> check_signals)
: worker_type_(worker_type),
language_(language),
log_dir_(log_dir),
@ -55,8 +55,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
heartbeat_timer_(io_service_),
worker_server_(WorkerTypeString(worker_type), 0 /* let grpc choose a port */),
gcs_client_(gcs_options),
object_interface_(worker_context_, raylet_client_, store_socket, use_memory_store,
check_signals),
object_interface_(worker_context_, raylet_client_, store_socket, check_signals),
task_execution_service_work_(task_execution_service_),
task_execution_callback_(task_execution_callback) {
// Initialize logging if log_dir is passed. Otherwise, it must be initialized

View file

@ -46,20 +46,14 @@ class CoreWorker {
/// \parma[in] check_signals Language worker function to check for signals and handle
/// them. If the function returns anything but StatusOK, any long-running
/// operations in the core worker will short circuit and return that status.
/// \param[in] use_memory_store Whether or not to use the in-memory object store
/// in addition to the plasma store.
///
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
/// NOTE(edoakes): the use_memory_store flag is a stop-gap solution to the issue
/// that randomly generated ObjectIDs may use the memory store
/// instead of the plasma store.
CoreWorker(const WorkerType worker_type, const Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id, const gcs::GcsClientOptions &gcs_options,
const std::string &log_dir, const std::string &node_ip_address,
const TaskExecutionCallback &task_execution_callback,
std::function<Status()> check_signals = nullptr,
bool use_memory_store = true);
std::function<Status()> check_signals = nullptr);
~CoreWorker();

View file

@ -25,7 +25,7 @@ void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider(
// and are only used locally.
// Thus we need to check whether this object is a task return object in additional
// to whether it's from direct actor call before we can choose memory store provider.
if (use_memory_store_ && object_id.IsReturnObject() &&
if (object_id.IsReturnObject() &&
object_id.GetTransportType() ==
static_cast<uint8_t>(TaskTransportType::DIRECT_ACTOR)) {
type = StoreProviderType::MEMORY;
@ -37,12 +37,10 @@ void CoreWorkerObjectInterface::GroupObjectIdsByStoreProvider(
CoreWorkerObjectInterface::CoreWorkerObjectInterface(
WorkerContext &worker_context, std::unique_ptr<RayletClient> &raylet_client,
const std::string &store_socket, bool use_memory_store,
std::function<Status()> check_signals)
const std::string &store_socket, std::function<Status()> check_signals)
: worker_context_(worker_context),
raylet_client_(raylet_client),
store_socket_(store_socket),
use_memory_store_(use_memory_store),
memory_store_(std::make_shared<CoreWorkerMemoryStore>()) {
check_signals_ = check_signals;
AddStoreProvider(StoreProviderType::PLASMA);

View file

@ -20,11 +20,9 @@ class CoreWorkerObjectInterface {
public:
/// \param[in] worker_context WorkerContext of the parent CoreWorker.
/// \param[in] store_socket Path to the plasma store socket.
/// \param[in] use_memory_store Whether or not to use the in-memory object store
/// in addition to the plasma store.
CoreWorkerObjectInterface(WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
const std::string &store_socket, bool use_memory_store = true,
const std::string &store_socket,
std::function<Status()> check_signals = nullptr);
/// Set options for this client's interactions with the object store.
@ -164,7 +162,6 @@ class CoreWorkerObjectInterface {
std::unique_ptr<RayletClient> &raylet_client_;
std::string store_socket_;
bool use_memory_store_;
/// In-memory store for return objects. This is used for `MEMORY` store provider.
std::shared_ptr<CoreWorkerMemoryStore> memory_store_;

View file

@ -89,6 +89,7 @@ template <typename Key, typename T>
using EnumUnorderedMap = std::unordered_map<Key, T, EnumClassHash>;
/// A helper function to fill random bytes into the `data`.
/// Warning: this is not fork-safe, we need to re-seed after that.
template <typename T>
void FillRandom(T *data) {
RAY_CHECK(data != nullptr);