Revert SubmitTask over grpc, use RayletConnection instead (#6305)

* Revert SubmitTask over grpc

* comment
This commit is contained in:
Stephanie Wang 2019-11-27 17:28:12 -08:00 committed by Edward Oakes
parent 22b305223a
commit 31a0b11e16
7 changed files with 32 additions and 53 deletions

View file

@ -4,14 +4,6 @@ package ray.rpc;
import "src/ray/protobuf/common.proto";
// Submit a task for execution.
message SubmitTaskRequest {
TaskSpec task_spec = 1;
}
message SubmitTaskReply {
}
// Request a worker from the raylet with the specified resources.
message WorkerLeaseRequest {
// TaskSpec containing the requested resources.
@ -90,8 +82,6 @@ message NodeStatsReply {
// Service for inter-node-manager communication.
service NodeManagerService {
// Submit a task (from a local or remote worker) to the node manager.
rpc SubmitTask(SubmitTaskRequest) returns (SubmitTaskReply);
// Request a worker from the raylet.
rpc RequestWorkerLease(WorkerLeaseRequest) returns (WorkerLeaseReply);
// Release a worker back to its raylet.

View file

@ -5,9 +5,12 @@
namespace ray.protocol;
enum MessageType:int {
// Task is submitted to the raylet. This is sent from a worker to a
// raylet.
SubmitTask = 1,
// Notify the raylet that a task has finished. This is sent from a
// worker to a raylet.
TaskDone = 1,
TaskDone,
// Log a message to the event table. This is sent from a worker to a raylet.
EventLogMessage,
// Send an initial connection message to the raylet. This is sent
@ -96,6 +99,10 @@ table Task {
task_execution_spec: TaskExecutionSpecification;
}
table SubmitTaskRequest {
task_spec: string;
}
// This message describes a given resource that is reserved for a worker.
table ResourceIdSetInfo {
// The name of the resource.

View file

@ -900,6 +900,12 @@ void NodeManager::ProcessClientMessage(
// because it's already disconnected.
return;
} break;
case protocol::MessageType::SubmitTask: {
// For tasks submitted via the raylet path, we must make sure to order the
// task submission so that tasks are always submitted after the tasks that
// they depend on.
ProcessSubmitTaskMessage(message_data);
} break;
case protocol::MessageType::SetResourceRequest: {
ProcessSetResourceRequest(client, message_data);
} break;
@ -1408,21 +1414,16 @@ void NodeManager::ProcessReportActiveObjectIDs(
unordered_set_from_flatbuf<ObjectID>(*message->object_ids()));
}
void NodeManager::HandleSubmitTask(const rpc::SubmitTaskRequest &request,
rpc::SubmitTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
rpc::Task task;
task.mutable_task_spec()->CopyFrom(request.task_spec());
// Set the caller's node ID.
if (task.task_spec().caller_address().raylet_id() == "") {
task.mutable_task_spec()->mutable_caller_address()->set_raylet_id(
gcs_client_->client_table().GetLocalClientId().Binary());
}
void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) {
// Read the task submitted by the client.
auto fbs_message = flatbuffers::GetRoot<protocol::SubmitTaskRequest>(message_data);
rpc::Task task_message;
RAY_CHECK(task_message.mutable_task_spec()->ParseFromArray(
fbs_message->task_spec()->data(), fbs_message->task_spec()->size()));
// Submit the task to the raylet. Since the task was submitted
// locally, there is no uncommitted lineage.
SubmitTask(Task(task), Lineage());
send_reply_callback(Status::OK(), nullptr, nullptr);
SubmitTask(Task(task_message), Lineage());
}
void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &request,

View file

@ -395,6 +395,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return True if the invariants are satisfied and false otherwise.
bool CheckDependencyManagerInvariant() const;
/// Process client message of SubmitTask
///
/// \param message_data A pointer to the message data.
/// \return Void.
void ProcessSubmitTaskMessage(const uint8_t *message_data);
/// Process client message of RegisterClientRequest
///
/// \param client The client that sent the message.
@ -510,11 +516,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return void.
void FinishAssignTask(const TaskID &task_id, Worker &worker, bool success);
/// Handle a `SubmitTask` request.
void HandleSubmitTask(const rpc::SubmitTaskRequest &request,
rpc::SubmitTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
/// Handle a `WorkerLease` request.
void HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &request,
rpc::WorkerLeaseReply *reply,

View file

@ -229,7 +229,6 @@ RayletClient::RayletClient(std::shared_ptr<ray::rpc::NodeManagerWorkerClient> gr
}
ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) {
ray::rpc::SubmitTaskRequest request;
for (size_t i = 0; i < task_spec.NumArgs(); i++) {
if (task_spec.ArgByRef(i)) {
for (size_t j = 0; j < task_spec.ArgIdCount(i); j++) {
@ -238,8 +237,11 @@ ray::Status RayletClient::SubmitTask(const ray::TaskSpecification &task_spec) {
}
}
}
request.mutable_task_spec()->CopyFrom(task_spec.GetMessage());
return grpc_client_->SubmitTask(request, /*callback=*/nullptr);
flatbuffers::FlatBufferBuilder fbb;
auto message = ray::protocol::CreateSubmitTaskRequest(
fbb, fbb.CreateString(task_spec.Serialize()));
fbb.Finish(message);
return conn_->WriteMessage(MessageType::SubmitTask, &fbb);
}
ray::Status RayletClient::TaskDone() {

View file

@ -73,16 +73,6 @@ class NodeManagerWorkerClient
return std::shared_ptr<NodeManagerWorkerClient>(instance);
}
/// Submit a task.
ray::Status SubmitTask(const SubmitTaskRequest &request,
const ClientCallback<SubmitTaskReply> &callback) {
auto call = client_call_manager_
.CreateCall<NodeManagerService, SubmitTaskRequest, SubmitTaskReply>(
*stub_, &NodeManagerService::Stub::PrepareAsyncSubmitTask,
request, callback);
return call->GetStatus();
}
/// Request a worker lease.
ray::Status RequestWorkerLease(const WorkerLeaseRequest &request,
const ClientCallback<WorkerLeaseReply> &callback) {

View file

@ -24,9 +24,6 @@ class NodeManagerServiceHandler {
/// \param[out] reply The reply message.
/// \param[in] send_reply_callback The callback to be called when the request is done.
virtual void HandleSubmitTask(const SubmitTaskRequest &request, SubmitTaskReply *reply,
SendReplyCallback send_reply_callback) = 0;
virtual void HandleWorkerLeaseRequest(const WorkerLeaseRequest &request,
WorkerLeaseReply *reply,
SendReplyCallback send_reply_callback) = 0;
@ -63,13 +60,6 @@ class NodeManagerGrpcService : public GrpcService {
std::vector<std::pair<std::unique_ptr<ServerCallFactory>, int>>
*server_call_factories_and_concurrencies) override {
// Initialize the factory for requests.
std::unique_ptr<ServerCallFactory> submit_task_call_factory(
new ServerCallFactoryImpl<NodeManagerService, NodeManagerServiceHandler,
SubmitTaskRequest, SubmitTaskReply>(
service_, &NodeManagerService::AsyncService::RequestSubmitTask,
service_handler_, &NodeManagerServiceHandler::HandleSubmitTask, cq,
main_service_));
std::unique_ptr<ServerCallFactory> request_worker_lease_call_factory(
new ServerCallFactoryImpl<NodeManagerService, NodeManagerServiceHandler,
WorkerLeaseRequest, WorkerLeaseReply>(
@ -99,8 +89,6 @@ class NodeManagerGrpcService : public GrpcService {
main_service_));
// Set accept concurrency.
server_call_factories_and_concurrencies->emplace_back(
std::move(submit_task_call_factory), 100);
server_call_factories_and_concurrencies->emplace_back(
std::move(request_worker_lease_call_factory), 100);
server_call_factories_and_concurrencies->emplace_back(