2017-03-05 02:05:02 -08:00
|
|
|
#include <limits.h>
|
2016-09-17 00:03:10 -07:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
#include "common_protocol.h"
|
2016-09-22 23:15:45 -07:00
|
|
|
|
2016-09-17 00:03:10 -07:00
|
|
|
#include "task.h"
|
2016-11-08 14:46:34 -08:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
extern "C" {
|
|
|
|
#include "sha256.h"
|
2016-11-08 14:46:34 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
ObjectID task_compute_return_id(TaskID task_id, int64_t return_index) {
|
2016-12-13 00:54:38 -08:00
|
|
|
/* Here, return_indices need to be >= 0, so we can use negative
|
|
|
|
* indices for put. */
|
2017-07-31 21:04:15 -07:00
|
|
|
RAY_DCHECK(return_index >= 0);
|
2016-11-08 14:46:34 -08:00
|
|
|
/* TODO(rkn): This line requires object and task IDs to be the same size. */
|
2017-02-26 00:32:43 -08:00
|
|
|
ObjectID return_id = task_id;
|
2016-11-08 14:46:34 -08:00
|
|
|
int64_t *first_bytes = (int64_t *) &return_id;
|
|
|
|
/* XOR the first bytes of the object ID with the return index. We add one so
|
|
|
|
* the first return ID is not the same as the task ID. */
|
|
|
|
*first_bytes = *first_bytes ^ (return_index + 1);
|
|
|
|
return return_id;
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
ObjectID task_compute_put_id(TaskID task_id, int64_t put_index) {
|
2017-07-31 21:04:15 -07:00
|
|
|
RAY_DCHECK(put_index >= 0);
|
2016-12-13 00:54:38 -08:00
|
|
|
/* TODO(pcm): This line requires object and task IDs to be the same size. */
|
2017-02-26 00:32:43 -08:00
|
|
|
ObjectID put_id = task_id;
|
2016-12-13 00:54:38 -08:00
|
|
|
int64_t *first_bytes = (int64_t *) &put_id;
|
|
|
|
/* XOR the first bytes of the object ID with the return index. We add one so
|
|
|
|
* the first return ID is not the same as the task ID. */
|
|
|
|
*first_bytes = *first_bytes ^ (-put_index - 1);
|
|
|
|
return put_id;
|
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
class TaskBuilder {
|
|
|
|
public:
|
|
|
|
void Start(UniqueID driver_id,
|
|
|
|
TaskID parent_task_id,
|
|
|
|
int64_t parent_counter,
|
|
|
|
ActorID actor_id,
|
2017-10-19 23:49:59 -07:00
|
|
|
ActorID actor_handle_id,
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t actor_counter,
|
2017-10-15 16:52:10 -07:00
|
|
|
bool is_actor_checkpoint_method,
|
2017-03-05 02:05:02 -08:00
|
|
|
FunctionID function_id,
|
|
|
|
int64_t num_returns) {
|
|
|
|
driver_id_ = driver_id;
|
|
|
|
parent_task_id_ = parent_task_id;
|
|
|
|
parent_counter_ = parent_counter;
|
|
|
|
actor_id_ = actor_id;
|
2017-10-19 23:49:59 -07:00
|
|
|
actor_handle_id_ = actor_handle_id;
|
2017-03-05 02:05:02 -08:00
|
|
|
actor_counter_ = actor_counter;
|
2017-10-15 16:52:10 -07:00
|
|
|
is_actor_checkpoint_method_ = is_actor_checkpoint_method;
|
2017-03-05 02:05:02 -08:00
|
|
|
function_id_ = function_id;
|
|
|
|
num_returns_ = num_returns;
|
|
|
|
|
|
|
|
/* Compute hashes. */
|
|
|
|
sha256_init(&ctx);
|
|
|
|
sha256_update(&ctx, (BYTE *) &driver_id, sizeof(driver_id));
|
|
|
|
sha256_update(&ctx, (BYTE *) &parent_task_id, sizeof(parent_task_id));
|
|
|
|
sha256_update(&ctx, (BYTE *) &parent_counter, sizeof(parent_counter));
|
|
|
|
sha256_update(&ctx, (BYTE *) &actor_id, sizeof(actor_id));
|
|
|
|
sha256_update(&ctx, (BYTE *) &actor_counter, sizeof(actor_counter));
|
2017-10-15 16:52:10 -07:00
|
|
|
sha256_update(&ctx, (BYTE *) &is_actor_checkpoint_method,
|
|
|
|
sizeof(is_actor_checkpoint_method));
|
2017-03-05 02:05:02 -08:00
|
|
|
sha256_update(&ctx, (BYTE *) &function_id, sizeof(function_id));
|
|
|
|
}
|
|
|
|
|
2017-11-10 16:33:34 -08:00
|
|
|
void NextReferenceArgument(ObjectID object_ids[], int num_object_ids) {
|
|
|
|
args.push_back(
|
|
|
|
CreateArg(fbb, to_flatbuf(fbb, &object_ids[0], num_object_ids)));
|
|
|
|
sha256_update(&ctx, (BYTE *) &object_ids[0],
|
|
|
|
sizeof(object_ids[0]) * num_object_ids);
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
void NextValueArgument(uint8_t *value, int64_t length) {
|
|
|
|
auto arg = fbb.CreateString((const char *) value, length);
|
2017-11-10 16:33:34 -08:00
|
|
|
auto empty_ids = fbb.CreateVectorOfStrings({});
|
|
|
|
args.push_back(CreateArg(fbb, empty_ids, arg));
|
2017-07-26 10:08:38 -07:00
|
|
|
sha256_update(&ctx, (BYTE *) value, length);
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
2017-12-01 11:41:40 -08:00
|
|
|
void SetRequiredResource(const std::string &resource_name, double value) {
|
|
|
|
CHECK(resource_map_.count(resource_name) == 0);
|
|
|
|
resource_map_[resource_name] = value;
|
2016-11-08 14:46:34 -08:00
|
|
|
}
|
2016-09-17 00:03:10 -07:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
uint8_t *Finish(int64_t *size) {
|
|
|
|
/* Add arguments. */
|
|
|
|
auto arguments = fbb.CreateVector(args);
|
|
|
|
/* Update hash. */
|
|
|
|
BYTE buff[DIGEST_SIZE];
|
|
|
|
sha256_final(&ctx, buff);
|
|
|
|
TaskID task_id;
|
|
|
|
CHECK(sizeof(task_id) <= DIGEST_SIZE);
|
|
|
|
memcpy(&task_id, buff, sizeof(task_id));
|
|
|
|
/* Add return object IDs. */
|
|
|
|
std::vector<flatbuffers::Offset<flatbuffers::String>> returns;
|
|
|
|
for (int64_t i = 0; i < num_returns_; i++) {
|
|
|
|
ObjectID return_id = task_compute_return_id(task_id, i);
|
|
|
|
returns.push_back(to_flatbuf(fbb, return_id));
|
|
|
|
}
|
|
|
|
/* Create TaskInfo. */
|
|
|
|
auto message = CreateTaskInfo(
|
|
|
|
fbb, to_flatbuf(fbb, driver_id_), to_flatbuf(fbb, task_id),
|
|
|
|
to_flatbuf(fbb, parent_task_id_), parent_counter_,
|
2017-10-19 23:49:59 -07:00
|
|
|
to_flatbuf(fbb, actor_id_), to_flatbuf(fbb, actor_handle_id_),
|
|
|
|
actor_counter_, is_actor_checkpoint_method_,
|
2017-03-05 02:05:02 -08:00
|
|
|
to_flatbuf(fbb, function_id_), arguments, fbb.CreateVector(returns),
|
2017-12-01 11:41:40 -08:00
|
|
|
map_to_flatbuf(fbb, resource_map_));
|
2017-03-05 02:05:02 -08:00
|
|
|
/* Finish the TaskInfo. */
|
|
|
|
fbb.Finish(message);
|
|
|
|
*size = fbb.GetSize();
|
|
|
|
uint8_t *result = (uint8_t *) malloc(*size);
|
|
|
|
memcpy(result, fbb.GetBufferPointer(), *size);
|
|
|
|
fbb.Clear();
|
|
|
|
args.clear();
|
2017-12-01 11:41:40 -08:00
|
|
|
resource_map_.clear();
|
2017-03-05 02:05:02 -08:00
|
|
|
return result;
|
2016-11-08 14:46:34 -08:00
|
|
|
}
|
2017-03-05 02:05:02 -08:00
|
|
|
|
|
|
|
private:
|
|
|
|
flatbuffers::FlatBufferBuilder fbb;
|
|
|
|
std::vector<flatbuffers::Offset<Arg>> args;
|
|
|
|
SHA256_CTX ctx;
|
|
|
|
|
|
|
|
/* Data for the builder. */
|
|
|
|
UniqueID driver_id_;
|
|
|
|
TaskID parent_task_id_;
|
|
|
|
int64_t parent_counter_;
|
|
|
|
ActorID actor_id_;
|
2017-10-19 23:49:59 -07:00
|
|
|
ActorID actor_handle_id_;
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t actor_counter_;
|
2017-10-15 16:52:10 -07:00
|
|
|
bool is_actor_checkpoint_method_;
|
2017-03-05 02:05:02 -08:00
|
|
|
FunctionID function_id_;
|
|
|
|
int64_t num_returns_;
|
2017-12-01 11:41:40 -08:00
|
|
|
std::unordered_map<std::string, double> resource_map_;
|
2017-03-05 02:05:02 -08:00
|
|
|
};
|
|
|
|
|
|
|
|
TaskBuilder *make_task_builder(void) {
|
|
|
|
return new TaskBuilder();
|
2016-11-08 14:46:34 -08:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
void free_task_builder(TaskBuilder *builder) {
|
|
|
|
delete builder;
|
2016-09-18 18:06:42 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
bool TaskID_equal(TaskID first_id, TaskID second_id) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return first_id == second_id;
|
2016-11-08 14:46:34 -08:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
bool TaskID_is_nil(TaskID id) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return id.is_nil();
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
bool ActorID_equal(ActorID first_id, ActorID second_id) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return first_id == second_id;
|
2017-02-15 00:10:05 -08:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
bool FunctionID_equal(FunctionID first_id, FunctionID second_id) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return first_id == second_id;
|
2017-01-25 22:53:48 -08:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
bool FunctionID_is_nil(FunctionID id) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return id.is_nil();
|
2016-09-22 23:15:45 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
/* Functions for building tasks. */
|
|
|
|
|
|
|
|
void TaskSpec_start_construct(TaskBuilder *builder,
|
|
|
|
UniqueID driver_id,
|
|
|
|
TaskID parent_task_id,
|
|
|
|
int64_t parent_counter,
|
|
|
|
ActorID actor_id,
|
2017-10-19 23:49:59 -07:00
|
|
|
ActorID actor_handle_id,
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t actor_counter,
|
2017-10-15 16:52:10 -07:00
|
|
|
bool is_actor_checkpoint_method,
|
2017-03-05 02:05:02 -08:00
|
|
|
FunctionID function_id,
|
|
|
|
int64_t num_returns) {
|
|
|
|
builder->Start(driver_id, parent_task_id, parent_counter, actor_id,
|
2017-10-19 23:49:59 -07:00
|
|
|
actor_handle_id, actor_counter, is_actor_checkpoint_method,
|
|
|
|
function_id, num_returns);
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-12-26 16:22:04 -08:00
|
|
|
TaskSpec *TaskSpec_finish_construct(TaskBuilder *builder, int64_t *size) {
|
|
|
|
return reinterpret_cast<TaskSpec *>(builder->Finish(size));
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-11-10 16:33:34 -08:00
|
|
|
void TaskSpec_args_add_ref(TaskBuilder *builder,
|
|
|
|
ObjectID object_ids[],
|
|
|
|
int num_object_ids) {
|
|
|
|
builder->NextReferenceArgument(&object_ids[0], num_object_ids);
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
void TaskSpec_args_add_val(TaskBuilder *builder,
|
|
|
|
uint8_t *value,
|
|
|
|
int64_t length) {
|
|
|
|
builder->NextValueArgument(value, length);
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
void TaskSpec_set_required_resource(TaskBuilder *builder,
|
2017-12-01 11:41:40 -08:00
|
|
|
const std::string &resource_name,
|
2017-03-05 02:05:02 -08:00
|
|
|
double value) {
|
2017-12-01 11:41:40 -08:00
|
|
|
builder->SetRequiredResource(resource_name, value);
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
/* Functions for reading tasks. */
|
|
|
|
|
2017-12-26 16:22:04 -08:00
|
|
|
TaskID TaskSpec_task_id(const TaskSpec *spec) {
|
2017-03-05 02:05:02 -08:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->task_id());
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
FunctionID TaskSpec_function(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->function_id());
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
ActorID TaskSpec_actor_id(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->actor_id());
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-10-19 23:49:59 -07:00
|
|
|
ActorID TaskSpec_actor_handle_id(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->actor_handle_id());
|
2017-10-19 23:49:59 -07:00
|
|
|
}
|
|
|
|
|
2017-10-13 20:52:11 -07:00
|
|
|
bool TaskSpec_is_actor_task(TaskSpec *spec) {
|
2017-12-26 16:22:04 -08:00
|
|
|
return !TaskSpec_actor_id(spec).is_nil();
|
2017-10-13 20:52:11 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-10-12 09:53:32 -07:00
|
|
|
return std::abs(message->actor_counter());
|
|
|
|
}
|
|
|
|
|
2017-10-15 16:52:10 -07:00
|
|
|
bool TaskSpec_is_actor_checkpoint_method(TaskSpec *spec) {
|
2017-10-12 09:53:32 -07:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-10-15 16:52:10 -07:00
|
|
|
return message->is_actor_checkpoint_method();
|
2017-02-09 01:34:14 -08:00
|
|
|
}
|
|
|
|
|
2017-10-13 20:52:11 -07:00
|
|
|
bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) {
|
|
|
|
if (TaskSpec_actor_counter(spec) == 0) {
|
|
|
|
/* The first task does not have any dependencies. */
|
|
|
|
return false;
|
2017-10-15 16:52:10 -07:00
|
|
|
} else if (TaskSpec_is_actor_checkpoint_method(spec)) {
|
2017-10-13 20:52:11 -07:00
|
|
|
/* Checkpoint tasks do not have any dependencies. */
|
|
|
|
return false;
|
|
|
|
} else {
|
|
|
|
/* For all other tasks, the last argument is the dummy object. */
|
|
|
|
return arg_index == (TaskSpec_num_args(spec) - 1);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-12-26 16:22:04 -08:00
|
|
|
UniqueID TaskSpec_driver_id(const TaskSpec *spec) {
|
2017-03-05 02:05:02 -08:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->driver_id());
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
|
|
|
|
2017-12-26 16:22:04 -08:00
|
|
|
TaskID TaskSpec_parent_task_id(const TaskSpec *spec) {
|
2017-08-01 17:16:57 -07:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->parent_task_id());
|
2017-08-01 17:16:57 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
int64_t TaskSpec_parent_counter(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return message->parent_counter();
|
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t TaskSpec_num_args(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return message->args()->size();
|
2017-02-09 01:34:14 -08:00
|
|
|
}
|
|
|
|
|
2017-12-14 20:47:54 -08:00
|
|
|
int64_t TaskSpec_num_args_by_ref(TaskSpec *spec) {
|
|
|
|
int64_t num_args = TaskSpec_num_args(spec);
|
|
|
|
int64_t num_args_by_ref = 0;
|
|
|
|
for (int64_t i = 0; i < num_args; i++) {
|
|
|
|
if (TaskSpec_arg_by_ref(spec, i)) {
|
|
|
|
num_args_by_ref++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return num_args_by_ref;
|
|
|
|
}
|
|
|
|
|
2017-11-10 16:33:34 -08:00
|
|
|
int TaskSpec_arg_id_count(TaskSpec *spec, int64_t arg_index) {
|
2017-03-05 02:05:02 -08:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-11-10 16:33:34 -08:00
|
|
|
auto ids = message->args()->Get(arg_index)->object_ids();
|
|
|
|
return ids->size();
|
|
|
|
}
|
|
|
|
|
|
|
|
ObjectID TaskSpec_arg_id(TaskSpec *spec, int64_t arg_index, int64_t id_index) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return from_flatbuf(
|
2017-12-01 11:41:40 -08:00
|
|
|
*message->args()->Get(arg_index)->object_ids()->Get(id_index));
|
2016-09-17 00:03:10 -07:00
|
|
|
}
|
2016-09-18 18:06:42 -07:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
const uint8_t *TaskSpec_arg_val(TaskSpec *spec, int64_t arg_index) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return (uint8_t *) message->args()->Get(arg_index)->data()->c_str();
|
2016-09-22 23:15:45 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t TaskSpec_arg_length(TaskSpec *spec, int64_t arg_index) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return message->args()->Get(arg_index)->data()->size();
|
|
|
|
}
|
2016-09-22 23:15:45 -07:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
int64_t TaskSpec_num_returns(TaskSpec *spec) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
return message->returns()->size();
|
|
|
|
}
|
|
|
|
|
|
|
|
bool TaskSpec_arg_by_ref(TaskSpec *spec, int64_t arg_index) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-11-10 16:33:34 -08:00
|
|
|
return message->args()->Get(arg_index)->object_ids()->size() != 0;
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
ObjectID TaskSpec_return(TaskSpec *spec, int64_t return_index) {
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return from_flatbuf(*message->returns()->Get(return_index));
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
2016-09-29 21:12:06 -07:00
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
double TaskSpec_get_required_resource(const TaskSpec *spec,
|
2017-12-01 11:41:40 -08:00
|
|
|
const std::string &resource_name) {
|
|
|
|
// This is a bit ugly. However it shouldn't be much of a performance issue
|
|
|
|
// because there shouldn't be many distinct resources in a single task spec.
|
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
|
|
|
for (size_t i = 0; i < message->required_resources()->size(); i++) {
|
|
|
|
const ResourcePair *resource_pair = message->required_resources()->Get(i);
|
|
|
|
if (string_from_flatbuf(*resource_pair->key()) == resource_name) {
|
|
|
|
return resource_pair->value();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
const std::unordered_map<std::string, double> TaskSpec_get_required_resources(
|
|
|
|
const TaskSpec *spec) {
|
2017-03-05 02:05:02 -08:00
|
|
|
CHECK(spec);
|
|
|
|
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
|
2017-12-01 11:41:40 -08:00
|
|
|
return map_from_flatbuf(*message->required_resources());
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
2017-12-14 20:47:54 -08:00
|
|
|
TaskSpec *TaskSpec_copy(TaskSpec *spec, int64_t task_spec_size) {
|
|
|
|
TaskSpec *copy = (TaskSpec *) malloc(task_spec_size);
|
|
|
|
memcpy(copy, spec, task_spec_size);
|
|
|
|
return copy;
|
|
|
|
}
|
|
|
|
|
|
|
|
void TaskSpec_free(TaskSpec *spec) {
|
|
|
|
free(spec);
|
|
|
|
}
|
|
|
|
|
|
|
|
TaskExecutionSpec::TaskExecutionSpec(
|
|
|
|
const std::vector<ObjectID> &execution_dependencies,
|
|
|
|
TaskSpec *spec,
|
|
|
|
int64_t task_spec_size) {
|
|
|
|
execution_dependencies_ = execution_dependencies;
|
|
|
|
task_spec_size_ = task_spec_size;
|
|
|
|
TaskSpec *spec_copy = new TaskSpec[task_spec_size_];
|
|
|
|
memcpy(spec_copy, spec, task_spec_size);
|
|
|
|
spec_ = std::unique_ptr<TaskSpec[]>(spec_copy);
|
|
|
|
}
|
|
|
|
|
|
|
|
TaskExecutionSpec::TaskExecutionSpec(TaskExecutionSpec *other) {
|
|
|
|
execution_dependencies_ = other->execution_dependencies_;
|
|
|
|
task_spec_size_ = other->task_spec_size_;
|
|
|
|
TaskSpec *spec_copy = new TaskSpec[task_spec_size_];
|
|
|
|
memcpy(spec_copy, other->spec_.get(), task_spec_size_);
|
|
|
|
spec_ = std::unique_ptr<TaskSpec[]>(spec_copy);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() {
|
|
|
|
return execution_dependencies_;
|
|
|
|
}
|
|
|
|
|
|
|
|
int64_t TaskExecutionSpec::SpecSize() {
|
|
|
|
return task_spec_size_;
|
|
|
|
}
|
|
|
|
|
|
|
|
TaskSpec *TaskExecutionSpec::Spec() {
|
|
|
|
return spec_.get();
|
|
|
|
}
|
|
|
|
|
|
|
|
int64_t TaskExecutionSpec::NumDependencies() {
|
|
|
|
TaskSpec *spec = Spec();
|
|
|
|
int64_t num_dependencies = TaskSpec_num_args(spec);
|
|
|
|
num_dependencies += execution_dependencies_.size();
|
|
|
|
return num_dependencies;
|
|
|
|
}
|
|
|
|
|
|
|
|
int TaskExecutionSpec::DependencyIdCount(int64_t dependency_index) {
|
|
|
|
TaskSpec *spec = Spec();
|
|
|
|
/* The first dependencies are the arguments of the task itself, followed by
|
|
|
|
* the execution dependencies. Find the total number of task arguments so
|
|
|
|
* that we can index into the correct list. */
|
|
|
|
int64_t num_args = TaskSpec_num_args(spec);
|
|
|
|
if (dependency_index < num_args) {
|
|
|
|
/* Index into the task arguments. */
|
|
|
|
return TaskSpec_arg_id_count(spec, dependency_index);
|
|
|
|
} else {
|
|
|
|
/* Index into the execution dependencies. */
|
|
|
|
dependency_index -= num_args;
|
|
|
|
CHECK((size_t) dependency_index < execution_dependencies_.size());
|
|
|
|
/* All elements in the execution dependency list have exactly one ID. */
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
ObjectID TaskExecutionSpec::DependencyId(int64_t dependency_index,
|
|
|
|
int64_t id_index) {
|
|
|
|
TaskSpec *spec = Spec();
|
|
|
|
/* The first dependencies are the arguments of the task itself, followed by
|
|
|
|
* the execution dependencies. Find the total number of task arguments so
|
|
|
|
* that we can index into the correct list. */
|
|
|
|
int64_t num_args = TaskSpec_num_args(spec);
|
|
|
|
if (dependency_index < num_args) {
|
|
|
|
/* Index into the task arguments. */
|
|
|
|
return TaskSpec_arg_id(spec, dependency_index, id_index);
|
|
|
|
} else {
|
|
|
|
/* Index into the execution dependencies. */
|
|
|
|
dependency_index -= num_args;
|
|
|
|
CHECK((size_t) dependency_index < execution_dependencies_.size());
|
|
|
|
return execution_dependencies_[dependency_index];
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
bool TaskExecutionSpec::DependsOn(ObjectID object_id) {
|
|
|
|
// Iterate through the task arguments to see if it contains object_id.
|
|
|
|
TaskSpec *spec = Spec();
|
2017-03-30 00:40:01 -07:00
|
|
|
int64_t num_args = TaskSpec_num_args(spec);
|
|
|
|
for (int i = 0; i < num_args; ++i) {
|
2017-11-10 16:33:34 -08:00
|
|
|
int count = TaskSpec_arg_id_count(spec, i);
|
|
|
|
for (int j = 0; j < count; j++) {
|
|
|
|
ObjectID arg_id = TaskSpec_arg_id(spec, i, j);
|
2017-12-26 16:22:04 -08:00
|
|
|
if (arg_id == object_id) {
|
2017-03-30 00:40:01 -07:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2017-12-14 20:47:54 -08:00
|
|
|
// Iterate through the execution dependencies to see if it contains object_id.
|
|
|
|
for (auto dependency_id : execution_dependencies_) {
|
2017-12-26 16:22:04 -08:00
|
|
|
if (dependency_id == object_id) {
|
2017-12-14 20:47:54 -08:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// The requested object ID was not a task argument or an execution dependency.
|
|
|
|
// This task is not dependent on it.
|
2017-03-30 00:40:01 -07:00
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2017-12-14 20:47:54 -08:00
|
|
|
bool TaskExecutionSpec::IsStaticDependency(int64_t dependency_index) {
|
|
|
|
TaskSpec *spec = Spec();
|
|
|
|
/* The first dependencies are the arguments of the task itself, followed by
|
|
|
|
* the execution dependencies. If the requested dependency index is a task
|
|
|
|
* argument, then it is a task dependency. */
|
|
|
|
int64_t num_args = TaskSpec_num_args(spec);
|
|
|
|
return (dependency_index < num_args);
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
/* TASK INSTANCES */
|
|
|
|
|
|
|
|
Task *Task_alloc(TaskSpec *spec,
|
|
|
|
int64_t task_spec_size,
|
2017-12-14 20:47:54 -08:00
|
|
|
int state,
|
|
|
|
DBClientID local_scheduler_id,
|
|
|
|
const std::vector<ObjectID> &execution_dependencies) {
|
|
|
|
Task *result = new Task();
|
|
|
|
auto execution_spec =
|
|
|
|
new TaskExecutionSpec(execution_dependencies, spec, task_spec_size);
|
|
|
|
result->execution_spec = std::unique_ptr<TaskExecutionSpec>(execution_spec);
|
|
|
|
result->state = state;
|
|
|
|
result->local_scheduler_id = local_scheduler_id;
|
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
|
|
|
Task *Task_alloc(TaskExecutionSpec &execution_spec,
|
2017-03-05 02:05:02 -08:00
|
|
|
int state,
|
|
|
|
DBClientID local_scheduler_id) {
|
2017-12-14 20:47:54 -08:00
|
|
|
Task *result = new Task();
|
|
|
|
result->execution_spec = std::unique_ptr<TaskExecutionSpec>(
|
|
|
|
new TaskExecutionSpec(&execution_spec));
|
2016-09-29 21:12:06 -07:00
|
|
|
result->state = state;
|
2016-12-25 23:57:05 -08:00
|
|
|
result->local_scheduler_id = local_scheduler_id;
|
2016-09-29 21:12:06 -07:00
|
|
|
return result;
|
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
Task *Task_copy(Task *other) {
|
2017-12-14 20:47:54 -08:00
|
|
|
return Task_alloc(*Task_task_execution_spec(other), other->state,
|
|
|
|
other->local_scheduler_id);
|
2016-12-03 13:49:09 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
int64_t Task_size(Task *task_arg) {
|
2017-12-14 20:47:54 -08:00
|
|
|
return sizeof(Task) - sizeof(TaskSpec) + task_arg->execution_spec->SpecSize();
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
int Task_state(Task *task) {
|
2016-11-10 18:13:26 -08:00
|
|
|
return task->state;
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void Task_set_state(Task *task, int state) {
|
2016-11-18 19:57:51 -08:00
|
|
|
task->state = state;
|
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
DBClientID Task_local_scheduler(Task *task) {
|
2016-12-25 23:57:05 -08:00
|
|
|
return task->local_scheduler_id;
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
|
|
|
|
2017-03-05 02:05:02 -08:00
|
|
|
void Task_set_local_scheduler(Task *task, DBClientID local_scheduler_id) {
|
2016-12-25 23:57:05 -08:00
|
|
|
task->local_scheduler_id = local_scheduler_id;
|
2016-11-18 19:57:51 -08:00
|
|
|
}
|
|
|
|
|
2017-12-14 20:47:54 -08:00
|
|
|
TaskExecutionSpec *Task_task_execution_spec(Task *task) {
|
|
|
|
return task->execution_spec.get();
|
2017-03-05 02:05:02 -08:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
TaskID Task_task_id(Task *task) {
|
2017-12-14 20:47:54 -08:00
|
|
|
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
|
|
|
TaskSpec *spec = execution_spec->Spec();
|
2017-03-05 02:05:02 -08:00
|
|
|
return TaskSpec_task_id(spec);
|
2016-09-29 21:12:06 -07:00
|
|
|
}
|
|
|
|
|
2017-02-26 00:32:43 -08:00
|
|
|
void Task_free(Task *task) {
|
2017-12-14 20:47:54 -08:00
|
|
|
delete task;
|
2016-09-22 23:15:45 -07:00
|
|
|
}
|