mirror of
https://github.com/vale981/ray
synced 2025-03-06 02:21:39 -05:00
Make use of C++14 'make_unique' (#14663)
This commit is contained in:
parent
a65002514c
commit
b92531918e
32 changed files with 96 additions and 116 deletions
|
@ -13,8 +13,8 @@ namespace api {
|
||||||
|
|
||||||
LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr<RayConfig> config) {
|
LocalModeRayRuntime::LocalModeRayRuntime(std::shared_ptr<RayConfig> config) {
|
||||||
config_ = config;
|
config_ = config;
|
||||||
worker_ = std::unique_ptr<WorkerContext>(new WorkerContext(
|
worker_ = std::make_unique<WorkerContext>(
|
||||||
WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil()));
|
WorkerType::DRIVER, ComputeDriverIdFromJob(JobID::Nil()), JobID::Nil());
|
||||||
object_store_ = std::unique_ptr<ObjectStore>(new LocalModeObjectStore(*this));
|
object_store_ = std::unique_ptr<ObjectStore>(new LocalModeObjectStore(*this));
|
||||||
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
|
task_submitter_ = std::unique_ptr<TaskSubmitter>(new LocalModeTaskSubmitter(*this));
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ NativeRayRuntime::NativeRayRuntime(std::shared_ptr<RayConfig> config) {
|
||||||
config_ = config;
|
config_ = config;
|
||||||
object_store_ = std::unique_ptr<ObjectStore>(new NativeObjectStore(*this));
|
object_store_ = std::unique_ptr<ObjectStore>(new NativeObjectStore(*this));
|
||||||
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter());
|
task_submitter_ = std::unique_ptr<TaskSubmitter>(new NativeTaskSubmitter());
|
||||||
task_executor_ = std::unique_ptr<TaskExecutor>(new TaskExecutor(*this));
|
task_executor_ = std::make_unique<TaskExecutor>(*this);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace api
|
} // namespace api
|
||||||
|
|
|
@ -14,8 +14,7 @@ namespace ray {
|
||||||
namespace api {
|
namespace api {
|
||||||
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
|
LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_tuntime)
|
||||||
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
|
: local_mode_ray_tuntime_(local_mode_ray_tuntime) {
|
||||||
memory_store_ =
|
memory_store_ = std::make_unique<::ray::CoreWorkerMemoryStore>();
|
||||||
std::unique_ptr<::ray::CoreWorkerMemoryStore>(new ::ray::CoreWorkerMemoryStore());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
|
||||||
|
|
|
@ -19,7 +19,7 @@ TaskExecutor::TaskExecutor(AbstractRayRuntime &abstract_ray_tuntime_)
|
||||||
// cluster mode.
|
// cluster mode.
|
||||||
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
|
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
|
||||||
abstract_ray_tuntime_.GetWorkerContext();
|
abstract_ray_tuntime_.GetWorkerContext();
|
||||||
return std::unique_ptr<ObjectID>(new ObjectID());
|
return std::make_unique<ObjectID>();
|
||||||
};
|
};
|
||||||
|
|
||||||
Status TaskExecutor::ExecuteTask(
|
Status TaskExecutor::ExecuteTask(
|
||||||
|
|
|
@ -24,8 +24,7 @@ IOServicePool::~IOServicePool() {}
|
||||||
|
|
||||||
void IOServicePool::Run() {
|
void IOServicePool::Run() {
|
||||||
for (size_t i = 0; i < io_service_num_; ++i) {
|
for (size_t i = 0; i < io_service_num_; ++i) {
|
||||||
io_services_.emplace_back(
|
io_services_.emplace_back(std::make_unique<instrumented_io_context>());
|
||||||
std::unique_ptr<instrumented_io_context>(new instrumented_io_context));
|
|
||||||
instrumented_io_context &io_service = *io_services_[i];
|
instrumented_io_context &io_service = *io_services_[i];
|
||||||
threads_.emplace_back([&io_service] {
|
threads_.emplace_back([&io_service] {
|
||||||
boost::asio::io_service::work work(io_service);
|
boost::asio::io_service::work work(io_service);
|
||||||
|
|
|
@ -192,7 +192,7 @@ void ServerConnection::WriteMessageAsync(
|
||||||
async_writes_ += 1;
|
async_writes_ += 1;
|
||||||
bytes_written_ += length;
|
bytes_written_ += length;
|
||||||
|
|
||||||
auto write_buffer = std::unique_ptr<AsyncWriteBuffer>(new AsyncWriteBuffer());
|
auto write_buffer = std::make_unique<AsyncWriteBuffer>();
|
||||||
write_buffer->write_cookie = RayConfig::instance().ray_cookie();
|
write_buffer->write_cookie = RayConfig::instance().ray_cookie();
|
||||||
write_buffer->write_type = type;
|
write_buffer->write_type = type;
|
||||||
write_buffer->write_length = length;
|
write_buffer->write_length = length;
|
||||||
|
|
|
@ -239,7 +239,7 @@ bool WorkerContext::CurrentActorDetached() const { return is_detached_actor_; }
|
||||||
|
|
||||||
WorkerThreadContext &WorkerContext::GetThreadContext() {
|
WorkerThreadContext &WorkerContext::GetThreadContext() {
|
||||||
if (thread_context_ == nullptr) {
|
if (thread_context_ == nullptr) {
|
||||||
thread_context_ = std::unique_ptr<WorkerThreadContext>(new WorkerThreadContext());
|
thread_context_ = std::make_unique<WorkerThreadContext>();
|
||||||
}
|
}
|
||||||
|
|
||||||
return *thread_context_;
|
return *thread_context_;
|
||||||
|
|
|
@ -85,7 +85,7 @@ thread_local std::weak_ptr<CoreWorker> CoreWorkerProcess::current_core_worker_;
|
||||||
|
|
||||||
void CoreWorkerProcess::Initialize(const CoreWorkerOptions &options) {
|
void CoreWorkerProcess::Initialize(const CoreWorkerOptions &options) {
|
||||||
RAY_CHECK(!instance_) << "The process is already initialized for core worker.";
|
RAY_CHECK(!instance_) << "The process is already initialized for core worker.";
|
||||||
instance_ = std::unique_ptr<CoreWorkerProcess>(new CoreWorkerProcess(options));
|
instance_.reset(new CoreWorkerProcess(options));
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoreWorkerProcess::Shutdown() {
|
void CoreWorkerProcess::Shutdown() {
|
||||||
|
@ -361,10 +361,9 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
auto execute_task =
|
auto execute_task =
|
||||||
std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
|
std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1,
|
||||||
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
|
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
|
||||||
direct_task_receiver_ =
|
direct_task_receiver_ = std::make_unique<CoreWorkerDirectTaskReceiver>(
|
||||||
std::unique_ptr<CoreWorkerDirectTaskReceiver>(new CoreWorkerDirectTaskReceiver(
|
worker_context_, task_execution_service_, execute_task,
|
||||||
worker_context_, task_execution_service_, execute_task,
|
[this] { return local_raylet_client_->TaskDone(); });
|
||||||
[this] { return local_raylet_client_->TaskDone(); }));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize raylet client.
|
// Initialize raylet client.
|
||||||
|
@ -380,11 +379,11 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
NodeID local_raylet_id;
|
NodeID local_raylet_id;
|
||||||
int assigned_port;
|
int assigned_port;
|
||||||
std::string serialized_job_config = options_.serialized_job_config;
|
std::string serialized_job_config = options_.serialized_job_config;
|
||||||
local_raylet_client_ = std::shared_ptr<raylet::RayletClient>(new raylet::RayletClient(
|
local_raylet_client_ = std::make_shared<raylet::RayletClient>(
|
||||||
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
|
io_service_, std::move(grpc_client), options_.raylet_socket, GetWorkerID(),
|
||||||
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
|
options_.worker_type, worker_context_.GetCurrentJobID(), options_.language,
|
||||||
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
|
options_.node_ip_address, &raylet_client_status, &local_raylet_id, &assigned_port,
|
||||||
&serialized_job_config));
|
&serialized_job_config);
|
||||||
|
|
||||||
if (!raylet_client_status.ok()) {
|
if (!raylet_client_status.ok()) {
|
||||||
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
|
// Avoid using FATAL log or RAY_CHECK here because they may create a core dump file.
|
||||||
|
@ -407,8 +406,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
|
|
||||||
// Start RPC server after all the task receivers are properly initialized and we have
|
// Start RPC server after all the task receivers are properly initialized and we have
|
||||||
// our assigned port from the raylet.
|
// our assigned port from the raylet.
|
||||||
core_worker_server_ = std::unique_ptr<rpc::GrpcServer>(
|
core_worker_server_ = std::make_unique<rpc::GrpcServer>(
|
||||||
new rpc::GrpcServer(WorkerTypeString(options_.worker_type), assigned_port));
|
WorkerTypeString(options_.worker_type), assigned_port);
|
||||||
core_worker_server_->RegisterService(grpc_service_);
|
core_worker_server_->RegisterService(grpc_service_);
|
||||||
core_worker_server_->Run();
|
core_worker_server_->Run();
|
||||||
|
|
||||||
|
@ -575,14 +574,12 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
reference_counter_, node_addr_factory, rpc_address_))
|
reference_counter_, node_addr_factory, rpc_address_))
|
||||||
: std::shared_ptr<LeasePolicyInterface>(
|
: std::shared_ptr<LeasePolicyInterface>(
|
||||||
std::make_shared<LocalLeasePolicy>(rpc_address_));
|
std::make_shared<LocalLeasePolicy>(rpc_address_));
|
||||||
direct_task_submitter_ =
|
direct_task_submitter_ = std::make_unique<CoreWorkerDirectTaskSubmitter>(
|
||||||
std::unique_ptr<CoreWorkerDirectTaskSubmitter>(new CoreWorkerDirectTaskSubmitter(
|
rpc_address_, local_raylet_client_, core_worker_client_pool_, raylet_client_factory,
|
||||||
rpc_address_, local_raylet_client_, core_worker_client_pool_,
|
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
|
||||||
raylet_client_factory, std::move(lease_policy), memory_store_, task_manager_,
|
RayConfig::instance().worker_lease_timeout_milliseconds(), std::move(actor_creator),
|
||||||
local_raylet_id, RayConfig::instance().worker_lease_timeout_milliseconds(),
|
RayConfig::instance().max_tasks_in_flight_per_worker(),
|
||||||
std::move(actor_creator),
|
boost::asio::steady_timer(io_service_));
|
||||||
RayConfig::instance().max_tasks_in_flight_per_worker(),
|
|
||||||
boost::asio::steady_timer(io_service_)));
|
|
||||||
auto report_locality_data_callback =
|
auto report_locality_data_callback =
|
||||||
[this](const ObjectID &object_id, const absl::flat_hash_set<NodeID> &locations,
|
[this](const ObjectID &object_id, const absl::flat_hash_set<NodeID> &locations,
|
||||||
uint64_t object_size) {
|
uint64_t object_size) {
|
||||||
|
@ -598,8 +595,8 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
task_argument_waiter_);
|
task_argument_waiter_);
|
||||||
}
|
}
|
||||||
|
|
||||||
actor_manager_ = std::unique_ptr<ActorManager>(
|
actor_manager_ = std::make_unique<ActorManager>(gcs_client_, direct_actor_submitter_,
|
||||||
new ActorManager(gcs_client_, direct_actor_submitter_, reference_counter_));
|
reference_counter_);
|
||||||
|
|
||||||
std::function<Status(const ObjectID &object_id, const ObjectLookupCallback &callback)>
|
std::function<Status(const ObjectID &object_id, const ObjectLookupCallback &callback)>
|
||||||
object_lookup_fn;
|
object_lookup_fn;
|
||||||
|
@ -651,16 +648,15 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
object_recovery_manager_ =
|
object_recovery_manager_ = std::make_unique<ObjectRecoveryManager>(
|
||||||
std::unique_ptr<ObjectRecoveryManager>(new ObjectRecoveryManager(
|
rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn,
|
||||||
rpc_address_, raylet_client_factory, local_raylet_client_, object_lookup_fn,
|
task_manager_, reference_counter_, memory_store_,
|
||||||
task_manager_, reference_counter_, memory_store_,
|
[this](const ObjectID &object_id, bool pin_object) {
|
||||||
[this](const ObjectID &object_id, bool pin_object) {
|
RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE),
|
||||||
RAY_CHECK_OK(Put(RayObject(rpc::ErrorType::OBJECT_UNRECONSTRUCTABLE),
|
/*contained_object_ids=*/{}, object_id,
|
||||||
/*contained_object_ids=*/{}, object_id,
|
/*pin_object=*/pin_object));
|
||||||
/*pin_object=*/pin_object));
|
},
|
||||||
},
|
RayConfig::instance().lineage_pinning_enabled());
|
||||||
RayConfig::instance().lineage_pinning_enabled()));
|
|
||||||
|
|
||||||
// Start the IO thread after all other members have been initialized, in case
|
// Start the IO thread after all other members have been initialized, in case
|
||||||
// the thread calls back into any of our members.
|
// the thread calls back into any of our members.
|
||||||
|
@ -1809,7 +1805,7 @@ std::pair<std::shared_ptr<const ActorHandle>, Status> CoreWorker::GetNamedActorH
|
||||||
name, [this, &actor_id, name, ready_promise](
|
name, [this, &actor_id, name, ready_promise](
|
||||||
Status status, const boost::optional<rpc::ActorTableData> &result) {
|
Status status, const boost::optional<rpc::ActorTableData> &result) {
|
||||||
if (status.ok() && result) {
|
if (status.ok() && result) {
|
||||||
auto actor_handle = std::unique_ptr<ActorHandle>(new ActorHandle(*result));
|
auto actor_handle = std::make_unique<ActorHandle>(*result);
|
||||||
actor_id = actor_handle->GetActorID();
|
actor_id = actor_handle->GetActorID();
|
||||||
actor_manager_->AddNewActorHandle(std::move(actor_handle), GetCallerId(),
|
actor_manager_->AddNewActorHandle(std::move(actor_handle), GetCallerId(),
|
||||||
CurrentCallSite(), rpc_address_,
|
CurrentCallSite(), rpc_address_,
|
||||||
|
@ -1863,8 +1859,7 @@ const ResourceMappingType CoreWorker::GetResourceIDs() const {
|
||||||
|
|
||||||
std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
|
std::unique_ptr<worker::ProfileEvent> CoreWorker::CreateProfileEvent(
|
||||||
const std::string &event_type) {
|
const std::string &event_type) {
|
||||||
return std::unique_ptr<worker::ProfileEvent>(
|
return std::make_unique<worker::ProfileEvent>(profiler_, event_type);
|
||||||
new worker::ProfileEvent(profiler_, event_type));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void CoreWorker::RunTaskExecutionLoop() { task_execution_service_.run(); }
|
void CoreWorker::RunTaskExecutionLoop() { task_execution_service_.run(); }
|
||||||
|
|
|
@ -30,7 +30,7 @@ class ReferenceCountTest : public ::testing::Test {
|
||||||
std::unique_ptr<ReferenceCounter> rc;
|
std::unique_ptr<ReferenceCounter> rc;
|
||||||
virtual void SetUp() {
|
virtual void SetUp() {
|
||||||
rpc::Address addr;
|
rpc::Address addr;
|
||||||
rc = std::unique_ptr<ReferenceCounter>(new ReferenceCounter(addr));
|
rc = std::make_unique<ReferenceCounter>(addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void TearDown() {}
|
virtual void TearDown() {}
|
||||||
|
@ -41,10 +41,9 @@ class ReferenceCountLineageEnabledTest : public ::testing::Test {
|
||||||
std::unique_ptr<ReferenceCounter> rc;
|
std::unique_ptr<ReferenceCounter> rc;
|
||||||
virtual void SetUp() {
|
virtual void SetUp() {
|
||||||
rpc::Address addr;
|
rpc::Address addr;
|
||||||
rc = std::unique_ptr<ReferenceCounter>(
|
rc = std::make_unique<ReferenceCounter>(addr,
|
||||||
new ReferenceCounter(addr,
|
/*distributed_ref_counting_enabled=*/true,
|
||||||
/*distributed_ref_counting_enabled=*/true,
|
/*lineage_pinning_enabled=*/true);
|
||||||
/*lineage_pinning_enabled=*/true));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual void TearDown() {}
|
virtual void TearDown() {}
|
||||||
|
|
|
@ -543,7 +543,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) {
|
||||||
auto task_spec = builder.Build();
|
auto task_spec = builder.Build();
|
||||||
|
|
||||||
ASSERT_TRUE(task_spec.IsActorTask());
|
ASSERT_TRUE(task_spec.IsActorTask());
|
||||||
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
|
auto request = std::make_unique<rpc::PushTaskRequest>();
|
||||||
request->mutable_task_spec()->Swap(&task_spec.GetMutableMessage());
|
request->mutable_task_spec()->Swap(&task_spec.GetMutableMessage());
|
||||||
}
|
}
|
||||||
RAY_LOG(INFO) << "Finish creating " << num_tasks << " PushTaskRequests"
|
RAY_LOG(INFO) << "Finish creating " << num_tasks << " PushTaskRequests"
|
||||||
|
|
|
@ -485,9 +485,8 @@ class DirectActorReceiverTest : public ::testing::Test {
|
||||||
auto execute_task =
|
auto execute_task =
|
||||||
std::bind(&DirectActorReceiverTest::MockExecuteTask, this, std::placeholders::_1,
|
std::bind(&DirectActorReceiverTest::MockExecuteTask, this, std::placeholders::_1,
|
||||||
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
|
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
|
||||||
receiver_ = std::unique_ptr<CoreWorkerDirectTaskReceiver>(
|
receiver_ = std::make_unique<CoreWorkerDirectTaskReceiver>(
|
||||||
new CoreWorkerDirectTaskReceiver(worker_context_, main_io_service_, execute_task,
|
worker_context_, main_io_service_, execute_task, [] { return Status::OK(); });
|
||||||
[] { return Status::OK(); }));
|
|
||||||
receiver_->Init(std::make_shared<rpc::CoreWorkerClientPool>(
|
receiver_->Init(std::make_shared<rpc::CoreWorkerClientPool>(
|
||||||
[&](const rpc::Address &addr) { return worker_client_; }),
|
[&](const rpc::Address &addr) { return worker_client_; }),
|
||||||
rpc_address_, dependency_waiter_);
|
rpc_address_, dependency_waiter_);
|
||||||
|
|
|
@ -276,7 +276,7 @@ void CoreWorkerDirectActorTaskSubmitter::ResendOutOfOrderTasks(const ActorID &ac
|
||||||
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue,
|
void CoreWorkerDirectActorTaskSubmitter::PushActorTask(const ClientQueue &queue,
|
||||||
const TaskSpecification &task_spec,
|
const TaskSpecification &task_spec,
|
||||||
bool skip_queue) {
|
bool skip_queue) {
|
||||||
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest());
|
auto request = std::make_unique<rpc::PushTaskRequest>();
|
||||||
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
|
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
|
||||||
// fails, then the task data will be gone when the TaskManager attempts to
|
// fails, then the task data will be gone when the TaskManager attempts to
|
||||||
// access the task.
|
// access the task.
|
||||||
|
|
|
@ -350,7 +350,7 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
|
||||||
const SchedulingKey &scheduling_key, const TaskSpecification &task_spec,
|
const SchedulingKey &scheduling_key, const TaskSpecification &task_spec,
|
||||||
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
|
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
|
||||||
auto task_id = task_spec.TaskId();
|
auto task_id = task_spec.TaskId();
|
||||||
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
|
auto request = std::make_unique<rpc::PushTaskRequest>();
|
||||||
bool is_actor = task_spec.IsActorTask();
|
bool is_actor = task_spec.IsActorTask();
|
||||||
bool is_actor_creation = task_spec.IsActorCreationTask();
|
bool is_actor_creation = task_spec.IsActorCreationTask();
|
||||||
|
|
||||||
|
|
|
@ -292,8 +292,8 @@ void GcsServer::InitStatsHandler() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void GcsServer::InitGcsWorkerManager() {
|
void GcsServer::InitGcsWorkerManager() {
|
||||||
gcs_worker_manager_ = std::unique_ptr<GcsWorkerManager>(
|
gcs_worker_manager_ =
|
||||||
new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_));
|
std::make_unique<GcsWorkerManager>(gcs_table_storage_, gcs_pub_sub_);
|
||||||
// Register service.
|
// Register service.
|
||||||
worker_info_service_.reset(
|
worker_info_service_.reset(
|
||||||
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
|
new rpc::WorkerInfoGrpcService(main_service_, *gcs_worker_manager_));
|
||||||
|
|
|
@ -344,7 +344,7 @@ void ObjectManager::Push(const ObjectID &object_id, const NodeID &node_id) {
|
||||||
if (nodes.count(node_id) == 0) {
|
if (nodes.count(node_id) == 0) {
|
||||||
// If config_.push_timeout_ms < 0, we give an empty timer
|
// If config_.push_timeout_ms < 0, we give an empty timer
|
||||||
// and the task will be kept infinitely.
|
// and the task will be kept infinitely.
|
||||||
auto timer = std::unique_ptr<boost::asio::deadline_timer>();
|
std::unique_ptr<boost::asio::deadline_timer> timer;
|
||||||
if (config_.push_timeout_ms == 0) {
|
if (config_.push_timeout_ms == 0) {
|
||||||
// The Push request fails directly when config_.push_timeout_ms == 0.
|
// The Push request fails directly when config_.push_timeout_ms == 0.
|
||||||
RAY_LOG(WARNING) << "Invalid Push request ObjectID " << object_id
|
RAY_LOG(WARNING) << "Invalid Push request ObjectID " << object_id
|
||||||
|
|
|
@ -327,9 +327,8 @@ class ObjectManager : public ObjectManagerInterface,
|
||||||
WaitState(instrumented_io_context &service, int64_t timeout_ms,
|
WaitState(instrumented_io_context &service, int64_t timeout_ms,
|
||||||
const WaitCallback &callback)
|
const WaitCallback &callback)
|
||||||
: timeout_ms(timeout_ms),
|
: timeout_ms(timeout_ms),
|
||||||
timeout_timer(std::unique_ptr<boost::asio::deadline_timer>(
|
timeout_timer(std::make_unique<boost::asio::deadline_timer>(
|
||||||
new boost::asio::deadline_timer(
|
service, boost::posix_time::milliseconds(timeout_ms))),
|
||||||
service, boost::posix_time::milliseconds(timeout_ms)))),
|
|
||||||
callback(callback) {}
|
callback(callback) {}
|
||||||
/// The period of time to wait before invoking the callback.
|
/// The period of time to wait before invoking the callback.
|
||||||
int64_t timeout_ms;
|
int64_t timeout_ms;
|
||||||
|
|
|
@ -218,8 +218,7 @@ uint8_t *PlasmaClient::Impl::GetStoreFdAndMmap(MEMFD_TYPE store_fd_val,
|
||||||
} else {
|
} else {
|
||||||
MEMFD_TYPE fd;
|
MEMFD_TYPE fd;
|
||||||
RAY_CHECK_OK(store_conn_->RecvFd(&fd));
|
RAY_CHECK_OK(store_conn_->RecvFd(&fd));
|
||||||
mmap_table_[store_fd_val] =
|
mmap_table_[store_fd_val] = std::make_unique<ClientMmapTableEntry>(fd, map_size);
|
||||||
std::unique_ptr<ClientMmapTableEntry>(new ClientMmapTableEntry(fd, map_size));
|
|
||||||
return mmap_table_[store_fd_val]->pointer();
|
return mmap_table_[store_fd_val]->pointer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,8 +247,7 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID &object_id,
|
||||||
if (elem == objects_in_use_.end()) {
|
if (elem == objects_in_use_.end()) {
|
||||||
// Add this object ID to the hash table of object IDs in use. The
|
// Add this object ID to the hash table of object IDs in use. The
|
||||||
// corresponding call to free happens in PlasmaClient::Release.
|
// corresponding call to free happens in PlasmaClient::Release.
|
||||||
objects_in_use_[object_id] =
|
objects_in_use_[object_id] = std::make_unique<ObjectInUseEntry>();
|
||||||
std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
|
|
||||||
objects_in_use_[object_id]->object = *object;
|
objects_in_use_[object_id]->object = *object;
|
||||||
objects_in_use_[object_id]->count = 0;
|
objects_in_use_[object_id]->count = 0;
|
||||||
objects_in_use_[object_id]->is_sealed = is_sealed;
|
objects_in_use_[object_id]->is_sealed = is_sealed;
|
||||||
|
|
|
@ -61,7 +61,7 @@ bool QuotaAwarePolicy::SetClientQuota(Client *client, int64_t output_memory_quot
|
||||||
// those objects will be lazily evicted on the next call
|
// those objects will be lazily evicted on the next call
|
||||||
cache_.AdjustCapacity(-output_memory_quota);
|
cache_.AdjustCapacity(-output_memory_quota);
|
||||||
per_client_cache_[client] =
|
per_client_cache_[client] =
|
||||||
std::unique_ptr<LRUCache>(new LRUCache(client->name, output_memory_quota));
|
std::make_unique<LRUCache>(client->name, output_memory_quota);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -294,7 +294,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID &object_id,
|
||||||
return PlasmaError::OutOfMemory;
|
return PlasmaError::OutOfMemory;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
|
auto ptr = std::make_unique<ObjectTableEntry>();
|
||||||
entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
|
entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
|
||||||
entry->data_size = data_size;
|
entry->data_size = data_size;
|
||||||
entry->metadata_size = metadata_size;
|
entry->metadata_size = metadata_size;
|
||||||
|
|
|
@ -2109,8 +2109,8 @@ void NodeManager::HandleFormatGlobalMemoryInfo(
|
||||||
|
|
||||||
// Fetch from remote nodes.
|
// Fetch from remote nodes.
|
||||||
for (const auto &entry : remote_node_manager_addresses_) {
|
for (const auto &entry : remote_node_manager_addresses_) {
|
||||||
std::unique_ptr<rpc::NodeManagerClient> client(new rpc::NodeManagerClient(
|
auto client = std::make_unique<rpc::NodeManagerClient>(
|
||||||
entry.second.first, entry.second.second, client_call_manager_));
|
entry.second.first, entry.second.second, client_call_manager_);
|
||||||
client->GetNodeStats(
|
client->GetNodeStats(
|
||||||
stats_req, [replies, store_reply](const ray::Status &status,
|
stats_req, [replies, store_reply](const ray::Status &status,
|
||||||
const rpc::GetNodeStatsReply &r) {
|
const rpc::GetNodeStatsReply &r) {
|
||||||
|
|
|
@ -1029,8 +1029,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillWaitingTasks) {
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/1);
|
Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}}, /*num_args=*/1);
|
||||||
tasks.push_back(task);
|
tasks.push_back(task);
|
||||||
replies.push_back(std::unique_ptr<rpc::RequestWorkerLeaseReply>(
|
replies.push_back(std::make_unique<rpc::RequestWorkerLeaseReply>());
|
||||||
new rpc::RequestWorkerLeaseReply()));
|
|
||||||
// All tasks except the last one added are waiting for dependencies.
|
// All tasks except the last one added are waiting for dependencies.
|
||||||
if (i < 4) {
|
if (i < 4) {
|
||||||
auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0];
|
auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0];
|
||||||
|
|
|
@ -89,8 +89,7 @@ void Worker::Connect(int port) {
|
||||||
rpc::Address addr;
|
rpc::Address addr;
|
||||||
addr.set_ip_address(ip_address_);
|
addr.set_ip_address(ip_address_);
|
||||||
addr.set_port(port_);
|
addr.set_port(port_);
|
||||||
rpc_client_ = std::unique_ptr<rpc::CoreWorkerClient>(
|
rpc_client_ = std::make_unique<rpc::CoreWorkerClient>(addr, client_call_manager_);
|
||||||
new rpc::CoreWorkerClient(addr, client_call_manager_));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Worker::Connect(std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client) {
|
void Worker::Connect(std::shared_ptr<rpc::CoreWorkerClientInterface> rpc_client) {
|
||||||
|
|
|
@ -92,7 +92,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
|
||||||
}
|
}
|
||||||
// Initialize free ports list with all ports in the specified range.
|
// Initialize free ports list with all ports in the specified range.
|
||||||
if (!worker_ports.empty()) {
|
if (!worker_ports.empty()) {
|
||||||
free_ports_ = std::unique_ptr<std::queue<int>>(new std::queue<int>());
|
free_ports_ = std::make_unique<std::queue<int>>();
|
||||||
for (int port : worker_ports) {
|
for (int port : worker_ports) {
|
||||||
free_ports_->push(port);
|
free_ports_->push(port);
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id
|
||||||
}
|
}
|
||||||
RAY_CHECK(min_worker_port > 0 && min_worker_port <= 65535);
|
RAY_CHECK(min_worker_port > 0 && min_worker_port <= 65535);
|
||||||
RAY_CHECK(max_worker_port >= min_worker_port && max_worker_port <= 65535);
|
RAY_CHECK(max_worker_port >= min_worker_port && max_worker_port <= 65535);
|
||||||
free_ports_ = std::unique_ptr<std::queue<int>>(new std::queue<int>());
|
free_ports_ = std::make_unique<std::queue<int>>();
|
||||||
for (int port = min_worker_port; port <= max_worker_port; port++) {
|
for (int port = min_worker_port; port <= max_worker_port; port++) {
|
||||||
free_ports_->push(port);
|
free_ports_->push(port);
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,8 +211,7 @@ class WorkerPoolTest : public ::testing::Test {
|
||||||
}
|
}
|
||||||
|
|
||||||
void SetWorkerCommands(const WorkerCommandMap &worker_commands) {
|
void SetWorkerCommands(const WorkerCommandMap &worker_commands) {
|
||||||
worker_pool_ =
|
worker_pool_ = std::make_unique<WorkerPoolMock>(io_service_, worker_commands);
|
||||||
std::unique_ptr<WorkerPoolMock>(new WorkerPoolMock(io_service_, worker_commands));
|
|
||||||
rpc::JobConfig job_config;
|
rpc::JobConfig job_config;
|
||||||
job_config.set_num_java_workers_per_process(NUM_WORKERS_PER_PROCESS_JAVA);
|
job_config.set_num_java_workers_per_process(NUM_WORKERS_PER_PROCESS_JAVA);
|
||||||
RegisterDriver(Language::PYTHON, JOB_ID, job_config);
|
RegisterDriver(Language::PYTHON, JOB_ID, job_config);
|
||||||
|
|
|
@ -88,9 +88,7 @@ raylet::RayletClient::RayletClient(
|
||||||
const std::string &ip_address, Status *status, NodeID *raylet_id, int *port,
|
const std::string &ip_address, Status *status, NodeID *raylet_id, int *port,
|
||||||
std::string *serialized_job_config)
|
std::string *serialized_job_config)
|
||||||
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
|
: grpc_client_(std::move(grpc_client)), worker_id_(worker_id), job_id_(job_id) {
|
||||||
// For C++14, we could use std::make_unique
|
conn_ = std::make_unique<raylet::RayletConnection>(io_service, raylet_socket, -1, -1);
|
||||||
conn_ = std::unique_ptr<raylet::RayletConnection>(
|
|
||||||
new raylet::RayletConnection(io_service, raylet_socket, -1, -1));
|
|
||||||
|
|
||||||
flatbuffers::FlatBufferBuilder fbb;
|
flatbuffers::FlatBufferBuilder fbb;
|
||||||
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
|
// TODO(suquark): Use `WorkerType` in `common.proto` without converting to int.
|
||||||
|
|
|
@ -31,8 +31,8 @@ class AgentManagerClient {
|
||||||
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
||||||
AgentManagerClient(const std::string &address, const int port,
|
AgentManagerClient(const std::string &address, const int port,
|
||||||
ClientCallManager &client_call_manager) {
|
ClientCallManager &client_call_manager) {
|
||||||
grpc_client_ = std::unique_ptr<GrpcClient<AgentManagerService>>(
|
grpc_client_ = std::make_unique<GrpcClient<AgentManagerService>>(address, port,
|
||||||
new GrpcClient<AgentManagerService>(address, port, client_call_manager));
|
client_call_manager);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Register agent service to the agent manager server
|
/// Register agent service to the agent manager server
|
||||||
|
|
|
@ -91,30 +91,28 @@ class GcsRpcClient {
|
||||||
|
|
||||||
void Reset(const std::string &address, const int port,
|
void Reset(const std::string &address, const int port,
|
||||||
ClientCallManager &client_call_manager) {
|
ClientCallManager &client_call_manager) {
|
||||||
job_info_grpc_client_ = std::unique_ptr<GrpcClient<JobInfoGcsService>>(
|
job_info_grpc_client_ = std::make_unique<GrpcClient<JobInfoGcsService>>(
|
||||||
new GrpcClient<JobInfoGcsService>(address, port, client_call_manager));
|
address, port, client_call_manager);
|
||||||
actor_info_grpc_client_ = std::unique_ptr<GrpcClient<ActorInfoGcsService>>(
|
actor_info_grpc_client_ = std::make_unique<GrpcClient<ActorInfoGcsService>>(
|
||||||
new GrpcClient<ActorInfoGcsService>(address, port, client_call_manager));
|
address, port, client_call_manager);
|
||||||
node_info_grpc_client_ = std::unique_ptr<GrpcClient<NodeInfoGcsService>>(
|
node_info_grpc_client_ = std::make_unique<GrpcClient<NodeInfoGcsService>>(
|
||||||
new GrpcClient<NodeInfoGcsService>(address, port, client_call_manager));
|
address, port, client_call_manager);
|
||||||
node_resource_info_grpc_client_ =
|
node_resource_info_grpc_client_ =
|
||||||
std::unique_ptr<GrpcClient<NodeResourceInfoGcsService>>(
|
std::make_unique<GrpcClient<NodeResourceInfoGcsService>>(address, port,
|
||||||
new GrpcClient<NodeResourceInfoGcsService>(address, port,
|
client_call_manager);
|
||||||
client_call_manager));
|
heartbeat_info_grpc_client_ = std::make_unique<GrpcClient<HeartbeatInfoGcsService>>(
|
||||||
heartbeat_info_grpc_client_ = std::unique_ptr<GrpcClient<HeartbeatInfoGcsService>>(
|
address, port, client_call_manager);
|
||||||
new GrpcClient<HeartbeatInfoGcsService>(address, port, client_call_manager));
|
object_info_grpc_client_ = std::make_unique<GrpcClient<ObjectInfoGcsService>>(
|
||||||
object_info_grpc_client_ = std::unique_ptr<GrpcClient<ObjectInfoGcsService>>(
|
address, port, client_call_manager);
|
||||||
new GrpcClient<ObjectInfoGcsService>(address, port, client_call_manager));
|
task_info_grpc_client_ = std::make_unique<GrpcClient<TaskInfoGcsService>>(
|
||||||
task_info_grpc_client_ = std::unique_ptr<GrpcClient<TaskInfoGcsService>>(
|
address, port, client_call_manager);
|
||||||
new GrpcClient<TaskInfoGcsService>(address, port, client_call_manager));
|
stats_grpc_client_ =
|
||||||
stats_grpc_client_ = std::unique_ptr<GrpcClient<StatsGcsService>>(
|
std::make_unique<GrpcClient<StatsGcsService>>(address, port, client_call_manager);
|
||||||
new GrpcClient<StatsGcsService>(address, port, client_call_manager));
|
worker_info_grpc_client_ = std::make_unique<GrpcClient<WorkerInfoGcsService>>(
|
||||||
worker_info_grpc_client_ = std::unique_ptr<GrpcClient<WorkerInfoGcsService>>(
|
address, port, client_call_manager);
|
||||||
new GrpcClient<WorkerInfoGcsService>(address, port, client_call_manager));
|
|
||||||
placement_group_info_grpc_client_ =
|
placement_group_info_grpc_client_ =
|
||||||
std::unique_ptr<GrpcClient<PlacementGroupInfoGcsService>>(
|
std::make_unique<GrpcClient<PlacementGroupInfoGcsService>>(address, port,
|
||||||
new GrpcClient<PlacementGroupInfoGcsService>(address, port,
|
client_call_manager);
|
||||||
client_call_manager));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add job info to GCS Service.
|
/// Add job info to GCS Service.
|
||||||
|
|
|
@ -39,8 +39,8 @@ class MetricsAgentClient {
|
||||||
ClientCallManager &client_call_manager) {
|
ClientCallManager &client_call_manager) {
|
||||||
RAY_LOG(DEBUG) << "Initiate the metrics client of address:" << address
|
RAY_LOG(DEBUG) << "Initiate the metrics client of address:" << address
|
||||||
<< " port:" << port;
|
<< " port:" << port;
|
||||||
grpc_client_ = std::unique_ptr<GrpcClient<ReporterService>>(
|
grpc_client_ =
|
||||||
new GrpcClient<ReporterService>(address, port, client_call_manager));
|
std::make_unique<GrpcClient<ReporterService>>(address, port, client_call_manager);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Report metrics to metrics agent.
|
/// Report metrics to metrics agent.
|
||||||
|
|
|
@ -37,8 +37,8 @@ class NodeManagerClient {
|
||||||
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
||||||
NodeManagerClient(const std::string &address, const int port,
|
NodeManagerClient(const std::string &address, const int port,
|
||||||
ClientCallManager &client_call_manager) {
|
ClientCallManager &client_call_manager) {
|
||||||
grpc_client_ = std::unique_ptr<GrpcClient<NodeManagerService>>(
|
grpc_client_ = std::make_unique<GrpcClient<NodeManagerService>>(address, port,
|
||||||
new GrpcClient<NodeManagerService>(address, port, client_call_manager));
|
client_call_manager);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Get current node stats.
|
/// Get current node stats.
|
||||||
|
@ -120,8 +120,8 @@ class NodeManagerWorkerClient
|
||||||
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
||||||
NodeManagerWorkerClient(const std::string &address, const int port,
|
NodeManagerWorkerClient(const std::string &address, const int port,
|
||||||
ClientCallManager &client_call_manager) {
|
ClientCallManager &client_call_manager) {
|
||||||
grpc_client_ = std::unique_ptr<GrpcClient<NodeManagerService>>(
|
grpc_client_ = std::make_unique<GrpcClient<NodeManagerService>>(address, port,
|
||||||
new GrpcClient<NodeManagerService>(address, port, client_call_manager));
|
client_call_manager);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// The RPC client.
|
/// The RPC client.
|
||||||
|
|
|
@ -210,9 +210,8 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
|
||||||
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
/// \param[in] client_call_manager The `ClientCallManager` used for managing requests.
|
||||||
CoreWorkerClient(const rpc::Address &address, ClientCallManager &client_call_manager)
|
CoreWorkerClient(const rpc::Address &address, ClientCallManager &client_call_manager)
|
||||||
: addr_(address) {
|
: addr_(address) {
|
||||||
grpc_client_ =
|
grpc_client_ = std::make_unique<GrpcClient<CoreWorkerService>>(
|
||||||
std::unique_ptr<GrpcClient<CoreWorkerService>>(new GrpcClient<CoreWorkerService>(
|
addr_.ip_address(), addr_.port(), client_call_manager);
|
||||||
addr_.ip_address(), addr_.port(), client_call_manager));
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const rpc::Address &Addr() const override { return addr_; }
|
const rpc::Address &Addr() const override { return addr_; }
|
||||||
|
|
|
@ -39,7 +39,7 @@ std::unique_ptr<LocalMemoryBuffer> Message::ToBytes() {
|
||||||
|
|
||||||
// COPY
|
// COPY
|
||||||
std::unique_ptr<LocalMemoryBuffer> buffer =
|
std::unique_ptr<LocalMemoryBuffer> buffer =
|
||||||
std::unique_ptr<LocalMemoryBuffer>(new LocalMemoryBuffer(bytes, total_len, true));
|
std::make_unique<LocalMemoryBuffer>(bytes, total_len, true);
|
||||||
delete bytes;
|
delete bytes;
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
|
@ -155,8 +155,8 @@ std::shared_ptr<WriterQueue> UpstreamQueueMessageHandler::CreateUpstreamQueue(
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
queue = std::unique_ptr<streaming::WriterQueue>(new streaming::WriterQueue(
|
queue = std::make_unique<streaming::WriterQueue>(queue_id, actor_id_, peer_actor_id,
|
||||||
queue_id, actor_id_, peer_actor_id, size, GetOutTransport(queue_id)));
|
size, GetOutTransport(queue_id));
|
||||||
upstream_queues_[queue_id] = queue;
|
upstream_queues_[queue_id] = queue;
|
||||||
|
|
||||||
return queue;
|
return queue;
|
||||||
|
@ -321,8 +321,8 @@ std::shared_ptr<ReaderQueue> DownstreamQueueMessageHandler::CreateDownstreamQueu
|
||||||
}
|
}
|
||||||
|
|
||||||
std::shared_ptr<streaming::ReaderQueue> queue =
|
std::shared_ptr<streaming::ReaderQueue> queue =
|
||||||
std::unique_ptr<streaming::ReaderQueue>(new streaming::ReaderQueue(
|
std::make_unique<streaming::ReaderQueue>(queue_id, actor_id_, peer_actor_id,
|
||||||
queue_id, actor_id_, peer_actor_id, GetOutTransport(queue_id)));
|
GetOutTransport(queue_id));
|
||||||
downstream_queues_[queue_id] = queue;
|
downstream_queues_[queue_id] = queue;
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue