[Core] Encode job ID in randomized task IDs for user-created threads (#19320)

## Why are these changes needed?

Currently, when `WorkerContext::GetCurrentTaskID()` returns a random task ID in user-created threads, and the returned task ID doesn't include the job ID. In this case, subsequent non-actor tasks and return values, and objects created by `ray.put()` don't include the job ID neither. This makes us hard to find the correct job ID from a task or object ID.

This PR updates the task ID generation code to always encode the job ID.

A side-effect of this PR is the change of possibility of task ID collision in user-created threads due to the fixed job ID part. w/o this PR: `sqrt(pi * 256 ^ 12 / 2)` ~= 352 trillion tasks. w/ this PR: `sqrt(pi * 256 ^ 8 / 2)` ~= 5 billion tasks. But this should be OK because the job ID part of task IDs in non-user-created threads are always fixed, so it won't be worse than non-user-created threads.

## Related issue number

## Checks

- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
This commit is contained in:
Kai Yang 2021-11-08 21:00:40 +08:00 committed by GitHub
parent 547bfbc4a4
commit e84391d1d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 144 additions and 82 deletions

View file

@ -144,8 +144,6 @@ InvocationSpec BuildInvocationSpec1(TaskType task_type,
const ActorID &actor) {
InvocationSpec invocation_spec;
invocation_spec.task_type = task_type;
invocation_spec.task_id =
TaskID::ForFakeTask(); // TODO(SongGuyang): make it from different task
invocation_spec.remote_function_holder = remote_function_holder;
invocation_spec.actor_id = actor;
invocation_spec.args = TransformArgs(args);

View file

@ -15,6 +15,7 @@
#pragma once
#include <ray/api/ray_runtime.h>
#include <msgpack.hpp>
#include "ray/common/id.h"
@ -26,7 +27,6 @@ namespace internal {
class InvocationSpec {
public:
TaskType task_type;
TaskID task_id;
std::string name;
ActorID actor_id;
int actor_counter;

View file

@ -41,13 +41,16 @@ ObjectID LocalModeTaskSubmitter::Submit(InvocationSpec &invocation,
rpc::Address address;
std::unordered_map<std::string, double> required_resources;
std::unordered_map<std::string, double> required_placement_resources;
std::string task_id_data(TaskID::Size(), 0);
FillRandom(&task_id_data);
auto task_id = TaskID::FromBinary(task_id_data);
TaskSpecBuilder builder;
std::string task_name =
invocation.name.empty() ? functionDescriptor->DefaultTaskName() : invocation.name;
// TODO (Alex): Properly set the depth here?
builder.SetCommonTaskSpec(invocation.task_id, task_name, rpc::Language::CPP,
functionDescriptor, local_mode_ray_tuntime_.GetCurrentJobID(),
builder.SetCommonTaskSpec(task_id, task_name, rpc::Language::CPP, functionDescriptor,
local_mode_ray_tuntime_.GetCurrentJobID(),
local_mode_ray_tuntime_.GetCurrentTaskId(), 0,
local_mode_ray_tuntime_.GetCurrentTaskId(), address, 1,
required_resources, required_placement_resources,

View file

@ -60,6 +60,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CActorID Of(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter)
CJobID JobId()
cdef cppclass CNodeID "ray::NodeID"(CUniqueID):
@staticmethod
@ -109,7 +111,7 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CTaskID ForDriverTask(const CJobID &job_id)
@staticmethod
CTaskID ForFakeTask()
CTaskID FromRandom(const CJobID &job_id)
@staticmethod
CTaskID ForActorCreationTask(CActorID actor_id)

View file

@ -171,8 +171,9 @@ cdef class TaskID(BaseID):
return CTaskID.Size()
@classmethod
def for_fake_task(cls):
return cls(CTaskID.ForFakeTask().Binary())
def for_fake_task(cls, job_id):
return cls(CTaskID.FromRandom(
CJobID.FromBinary(job_id.binary())).Binary())
@classmethod
def for_driver_task(cls, job_id):
@ -300,6 +301,10 @@ cdef class ActorID(BaseID):
def size(self):
return CActorID.Size()
@property
def job_id(self):
return JobID(self.data.JobId().Binary())
def binary(self):
return self.data.Binary()

View file

@ -348,5 +348,52 @@ def test_listen_on_localhost(ray_start_regular):
assert "127.0.0.1" in connection.laddr.ip
def test_job_id_consistency(ray_start_regular):
@ray.remote
def foo():
return "bar"
@ray.remote
class Foo:
def ping(self):
return "pong"
@ray.remote
def verify_job_id(job_id, new_thread):
def verify():
current_task_id = ray.runtime_context.get_runtime_context().task_id
assert job_id == current_task_id.job_id()
obj1 = foo.remote()
assert job_id == obj1.job_id()
obj2 = ray.put(1)
assert job_id == obj2.job_id()
a = Foo.remote()
assert job_id == a._actor_id.job_id
obj3 = a.ping.remote()
assert job_id == obj3.job_id()
if not new_thread:
verify()
else:
exc = []
def run():
try:
verify()
except BaseException as e:
exc.append(e)
import threading
t = threading.Thread(target=run)
t.start()
t.join()
if len(exc) > 0:
raise exc[0]
job_id = ray.runtime_context.get_runtime_context().job_id
ray.get(verify_job_id.remote(job_id, False))
ray.get(verify_job_id.remote(job_id, True))
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

View file

@ -15,13 +15,13 @@
#include "ray/common/id.h"
#include <limits.h>
#include <algorithm>
#include <chrono>
#include <mutex>
#include <random>
#include "absl/time/clock.h"
#include "ray/common/constants.h"
#include "ray/common/status.h"
#include "ray/util/macros.h"
@ -160,9 +160,10 @@ TaskID TaskID::ForDriverTask(const JobID &job_id) {
return TaskID::FromBinary(data);
}
TaskID TaskID::ForFakeTask() {
std::string data(kLength, 0);
TaskID TaskID::FromRandom(const JobID &job_id) {
std::string data(kLength - JobID::kLength, 0);
FillRandom(&data);
std::copy_n(job_id.Data(), JobID::kLength, std::back_inserter(data));
return TaskID::FromBinary(data);
}

View file

@ -191,7 +191,7 @@ class TaskID : public BaseID<TaskID> {
static TaskID ForDriverTask(const JobID &job_id);
/// Generate driver task id for the given job.
static TaskID ForFakeTask();
static TaskID FromRandom(const JobID &job_id);
/// Creates a TaskID for an actor creation task.
///

View file

@ -19,8 +19,8 @@ namespace core {
/// per-thread context for core worker.
struct WorkerThreadContext {
WorkerThreadContext()
: current_task_id_(TaskID::ForFakeTask()), task_index_(0), put_counter_(0) {}
explicit WorkerThreadContext(const JobID &job_id)
: current_task_id_(TaskID::FromRandom(job_id)), task_index_(0), put_counter_(0) {}
uint64_t GetNextTaskIndex() { return ++task_index_; }
@ -276,9 +276,9 @@ bool WorkerContext::CurrentActorDetached() const {
return is_detached_actor_;
}
WorkerThreadContext &WorkerContext::GetThreadContext() {
WorkerThreadContext &WorkerContext::GetThreadContext() const {
if (thread_context_ == nullptr) {
thread_context_ = std::make_unique<WorkerThreadContext>();
thread_context_ = std::make_unique<WorkerThreadContext>(current_job_id_);
}
return *thread_context_;

View file

@ -109,7 +109,7 @@ class WorkerContext {
mutable absl::Mutex mutex_;
private:
static WorkerThreadContext &GetThreadContext();
WorkerThreadContext &GetThreadContext() const;
/// Per-thread worker context.
static thread_local std::unique_ptr<WorkerThreadContext> thread_context_;

View file

@ -701,6 +701,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
rpc_address_, local_raylet_client_, core_worker_client_pool_, raylet_client_factory,
std::move(lease_policy), memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds(), actor_creator_,
worker_context_.GetCurrentJobID(),
RayConfig::instance().max_tasks_in_flight_per_worker(),
boost::asio::steady_timer(io_service_),
RayConfig::instance().max_pending_lease_requests_per_scheduling_category());

View file

@ -41,7 +41,7 @@ TaskSpecification CreateActorTaskHelper(ActorID actor_id, WorkerID caller_worker
int64_t counter,
TaskID caller_id = TaskID::Nil()) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::ForFakeTask().Binary());
task.GetMutableMessage().set_task_id(TaskID::FromRandom(actor_id.JobId()).Binary());
task.GetMutableMessage().set_caller_id(caller_id.Binary());
task.GetMutableMessage().set_type(TaskType::ACTOR_TASK);
task.GetMutableMessage().mutable_caller_address()->set_worker_id(

View file

@ -43,7 +43,7 @@ class DirectTaskTransportTest : public ::testing::Test {
std::make_shared<CoreWorkerMemoryStore>(), task_finisher,
NodeID::Nil(), /* local_raylet_id */
0, /* lease_timeout_ms */
actor_creator);
actor_creator, JobID::Nil() /* job_id */);
}
TaskSpecification GetCreatingTaskSpec(const ActorID &actor_id) {

View file

@ -520,9 +520,9 @@ TEST(DirectTaskTransportTest, TestSubmitOneTask) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
@ -561,9 +561,9 @@ TEST(DirectTaskTransportTest, TestRetryTaskApplicationLevelError) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
task.GetMutableMessage().set_retry_exceptions(true);
@ -608,9 +608,9 @@ TEST(DirectTaskTransportTest, TestHandleTaskFailure) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok());
@ -642,7 +642,7 @@ TEST(DirectTaskTransportTest, TestConcurrentWorkerLeases) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 2);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 2);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
@ -711,7 +711,7 @@ TEST(DirectTaskTransportTest, TestSubmitMultipleTasks) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
@ -773,7 +773,7 @@ TEST(DirectTaskTransportTest, TestReuseWorkerLease) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
@ -836,7 +836,7 @@ TEST(DirectTaskTransportTest, TestRetryLeaseCancellation) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
TaskSpecification task3 = BuildEmptyTaskSpec();
@ -892,9 +892,9 @@ TEST(DirectTaskTransportTest, TestConcurrentCancellationAndSubmission) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
TaskSpecification task3 = BuildEmptyTaskSpec();
@ -949,7 +949,7 @@ TEST(DirectTaskTransportTest, TestWorkerNotReusedOnError) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
@ -993,9 +993,9 @@ TEST(DirectTaskTransportTest, TestWorkerNotReturnedOnExit) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task1 = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task1).ok());
@ -1040,7 +1040,7 @@ TEST(DirectTaskTransportTest, TestSpillback) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, lease_client_factory, lease_policy, store,
task_finisher, NodeID::Nil(), kLongTimeout, actor_creator);
task_finisher, NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok());
@ -1104,7 +1104,7 @@ TEST(DirectTaskTransportTest, TestSpillbackRoundTrip) {
auto lease_policy = std::make_shared<MockLeasePolicy>(local_raylet_id);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, lease_client_factory, lease_policy, store,
task_finisher, local_raylet_id, kLongTimeout, actor_creator);
task_finisher, local_raylet_id, kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok());
@ -1165,7 +1165,7 @@ void TestSchedulingKey(const std::shared_ptr<CoreWorkerMemoryStore> store,
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
ASSERT_TRUE(submitter.SubmitTask(same1).ok());
ASSERT_TRUE(submitter.SubmitTask(same2).ok());
@ -1305,7 +1305,7 @@ TEST(DirectTaskTransportTest, TestBacklogReport) {
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, 1, absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
@ -1365,7 +1365,7 @@ TEST(DirectTaskTransportTest, TestWorkerLeaseTimeout) {
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(),
/*lease_timeout_ms=*/5, actor_creator, 1, absl::nullopt, 1);
/*lease_timeout_ms=*/5, actor_creator, JobID::Nil(), 1, absl::nullopt, 1);
TaskSpecification task1 = BuildEmptyTaskSpec();
TaskSpecification task2 = BuildEmptyTaskSpec();
TaskSpecification task3 = BuildEmptyTaskSpec();
@ -1420,9 +1420,9 @@ TEST(DirectTaskTransportTest, TestKillExecutingTask) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok());
@ -1470,9 +1470,9 @@ TEST(DirectTaskTransportTest, TestKillPendingTask) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ASSERT_TRUE(submitter.SubmitTask(task).ok());
@ -1504,9 +1504,9 @@ TEST(DirectTaskTransportTest, TestKillResolvingTask) {
auto task_finisher = std::make_shared<MockTaskFinisher>();
auto actor_creator = std::make_shared<MockActorCreator>();
auto lease_policy = std::make_shared<MockLeasePolicy>();
CoreWorkerDirectTaskSubmitter submitter(address, raylet_client, client_pool, nullptr,
lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator);
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil());
TaskSpecification task = BuildEmptyTaskSpec();
ObjectID obj1 = ObjectID::FromRandom();
task.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(obj1.Binary());
@ -1544,8 +1544,8 @@ TEST(DirectTaskTransportTest, TestPipeliningConcurrentWorkerLeases) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, max_tasks_in_flight_per_worker,
absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(),
max_tasks_in_flight_per_worker, absl::nullopt, 1);
// Prepare 20 tasks and save them in a vector.
std::vector<TaskSpecification> tasks;
@ -1619,8 +1619,8 @@ TEST(DirectTaskTransportTest, TestPipeliningReuseWorkerLease) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, max_tasks_in_flight_per_worker,
absl::nullopt, 2);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(),
max_tasks_in_flight_per_worker, absl::nullopt, 2);
// prepare 30 tasks and save them in a vector
std::vector<TaskSpecification> tasks;
@ -1711,8 +1711,8 @@ TEST(DirectTaskTransportTest, TestPipeliningNumberOfWorkersRequested) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, max_tasks_in_flight_per_worker,
absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(),
max_tasks_in_flight_per_worker, absl::nullopt, 1);
// prepare 30 tasks and save them in a vector
std::vector<TaskSpecification> tasks;
@ -1897,8 +1897,8 @@ TEST(DirectTaskTransportTest, TestStealingTasks) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, max_tasks_in_flight_per_worker,
absl::nullopt, 1);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(),
max_tasks_in_flight_per_worker, absl::nullopt, 1);
// prepare 20 tasks and save them in a vector
std::vector<TaskSpecification> tasks;
@ -2078,8 +2078,8 @@ TEST(DirectTaskTransportTest, TestNoStealingByExpiredWorker) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), 1000, actor_creator, max_tasks_in_flight_per_worker, absl::nullopt,
1);
NodeID::Nil(), 1000, actor_creator, JobID::Nil(), max_tasks_in_flight_per_worker,
absl::nullopt, 1);
// prepare 30 tasks and save them in a vector
std::vector<TaskSpecification> tasks;
@ -2217,8 +2217,8 @@ TEST(DirectTaskTransportTest, TestNoWorkerRequestedIfStealingUnavailable) {
uint32_t max_tasks_in_flight_per_worker = 10;
CoreWorkerDirectTaskSubmitter submitter(
address, raylet_client, client_pool, nullptr, lease_policy, store, task_finisher,
NodeID::Nil(), kLongTimeout, actor_creator, max_tasks_in_flight_per_worker,
absl::nullopt, 2);
NodeID::Nil(), kLongTimeout, actor_creator, JobID::Nil(),
max_tasks_in_flight_per_worker, absl::nullopt, 2);
// prepare 10 tasks and save them in a vector
std::vector<TaskSpecification> tasks;

View file

@ -22,7 +22,7 @@ namespace core {
TaskSpecification CreateFakeTask(std::vector<ObjectID> deps) {
TaskSpecification spec;
spec.GetMutableMessage().set_task_id(TaskID::ForFakeTask().Binary());
spec.GetMutableMessage().set_task_id(TaskID::FromRandom(JobID::FromInt(1)).Binary());
for (auto &dep : deps) {
spec.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(
dep.Binary());

View file

@ -28,7 +28,7 @@ namespace core {
TaskSpecification CreateTaskHelper(uint64_t num_returns,
std::vector<ObjectID> dependencies) {
TaskSpecification task;
task.GetMutableMessage().set_task_id(TaskID::ForFakeTask().Binary());
task.GetMutableMessage().set_task_id(TaskID::FromRandom(JobID::FromInt(1)).Binary());
task.GetMutableMessage().set_num_returns(num_returns);
for (const ObjectID &dep : dependencies) {
task.GetMutableMessage().add_args()->mutable_object_ref()->set_object_id(

View file

@ -513,7 +513,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
// Create a TaskSpecification with an overwritten TaskID to make sure we don't reuse the
// same TaskID to request a worker
auto resource_spec_msg = scheduling_key_entry.resource_spec.GetMutableMessage();
resource_spec_msg.set_task_id(TaskID::ForFakeTask().Binary());
resource_spec_msg.set_task_id(TaskID::FromRandom(job_id_).Binary());
const TaskSpecification resource_spec = TaskSpecification(resource_spec_msg);
rpc::Address best_node_address;
if (raylet_address == nullptr) {

View file

@ -64,6 +64,7 @@ class CoreWorkerDirectTaskSubmitter {
std::shared_ptr<CoreWorkerMemoryStore> store,
std::shared_ptr<TaskFinisherInterface> task_finisher, NodeID local_raylet_id,
int64_t lease_timeout_ms, std::shared_ptr<ActorCreatorInterface> actor_creator,
const JobID &job_id,
uint32_t max_tasks_in_flight_per_worker =
::RayConfig::instance().max_tasks_in_flight_per_worker(),
absl::optional<boost::asio::steady_timer> cancel_timer = absl::nullopt,
@ -79,6 +80,7 @@ class CoreWorkerDirectTaskSubmitter {
local_raylet_id_(local_raylet_id),
actor_creator_(actor_creator),
client_cache_(core_worker_client_pool),
job_id_(job_id),
max_tasks_in_flight_per_worker_(max_tasks_in_flight_per_worker),
max_pending_lease_requests_per_scheduling_category_(
max_pending_lease_requests_per_scheduling_category),
@ -250,6 +252,9 @@ class CoreWorkerDirectTaskSubmitter {
/// Cache of gRPC clients to other workers.
std::shared_ptr<rpc::CoreWorkerClientPool> client_cache_;
/// The ID of the job.
const JobID job_id_;
// max_tasks_in_flight_per_worker_ limits the number of tasks that can be pipelined to a
// worker using a single lease.
const uint32_t max_tasks_in_flight_per_worker_;

View file

@ -447,7 +447,7 @@ static inline TaskSpecification ExampleTaskSpec(
const ActorID actor_id = ActorID::Nil(), const Language &language = Language::PYTHON,
const JobID &job_id = JOB_ID, const ActorID actor_creation_id = ActorID::Nil(),
const std::vector<std::string> &dynamic_worker_options = {},
const TaskID &task_id = TaskID::ForFakeTask(),
const TaskID &task_id = TaskID::FromRandom(JobID::Nil()),
const std::string serialized_runtime_env = "") {
rpc::TaskSpec message;
message.set_job_id(job_id.Binary());
@ -1231,12 +1231,12 @@ TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) {
TEST_F(WorkerPoolTest, PopWorkerWithRuntimeEnv) {
ASSERT_EQ(worker_pool_->GetProcessSize(), 0);
auto actor_creation_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1);
const auto actor_creation_task_spec =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id,
{"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}");
const auto normal_task_spec =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(),
{"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}");
const auto actor_creation_task_spec = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id, {"XXX=YYY"},
TaskID::FromRandom(JobID::Nil()), R"({"uris": "XXX"})");
const auto normal_task_spec = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), {"XXX=YYY"},
TaskID::FromRandom(JobID::Nil()), R"({"uris": "XXX"})");
const auto normal_task_spec_without_runtime_env =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), {});
// Pop worker for actor creation task again.
@ -1271,13 +1271,13 @@ TEST_F(WorkerPoolTest, CacheWorkersByRuntimeEnvHash) {
auto actor_creation_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1);
const auto actor_creation_task_spec_1 = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id,
/*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_1");
/*dynamic_options=*/{}, TaskID::FromRandom(JobID::Nil()), "mock_runtime_env_1");
const auto task_spec_1 = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(),
/*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_1");
/*dynamic_options=*/{}, TaskID::FromRandom(JobID::Nil()), "mock_runtime_env_1");
const auto task_spec_2 = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(),
/*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_2");
/*dynamic_options=*/{}, TaskID::FromRandom(JobID::Nil()), "mock_runtime_env_2");
const WorkerCacheKey env1 = {"mock_runtime_env_1", {}};
const int runtime_env_hash_1 = env1.IntHash();
@ -1425,7 +1425,7 @@ TEST_F(WorkerPoolTest, PopWorkerStatus) {
// Create a task with bad runtime env.
const auto task_spec_with_bad_runtime_env =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(),
{"XXX=YYY"}, TaskID::ForFakeTask(), BAD_RUNTIME_ENV);
{"XXX=YYY"}, TaskID::FromRandom(JobID::Nil()), BAD_RUNTIME_ENV);
popped_worker =
worker_pool_->PopWorkerSync(task_spec_with_bad_runtime_env, true, &status);
// PopWorker failed and the status is `RuntimeEnvCreationFailed`.
@ -1433,9 +1433,9 @@ TEST_F(WorkerPoolTest, PopWorkerStatus) {
ASSERT_EQ(status, PopWorkerStatus::RuntimeEnvCreationFailed);
// Create a task with available runtime env.
const auto task_spec_with_runtime_env =
ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(),
{"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}");
const auto task_spec_with_runtime_env = ExampleTaskSpec(
ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(), {"XXX=YYY"},
TaskID::FromRandom(JobID::Nil()), R"({"uris": "XXX"})");
popped_worker = worker_pool_->PopWorkerSync(task_spec_with_runtime_env, true, &status);
// PopWorker success.
ASSERT_NE(popped_worker, nullptr);