From c1459f3603dc2b6e9a55477dd641069e67d87e0d Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Thu, 17 Mar 2016 22:56:55 -0700 Subject: [PATCH] clean up queues --- src/worker.cc | 39 ++++++--------------------------------- src/worker.h | 12 +++++------- 2 files changed, 11 insertions(+), 40 deletions(-) diff --git a/src/worker.cc b/src/worker.cc index 7c8d33e49..0375896e7 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -1,21 +1,10 @@ # include "worker.h" Status WorkerServiceImpl::InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) { - // TODO(rkn): This method opens a message_queue, which may consume a - // filehandle. This should be changed to only open a queue once in the - // constructor. call_ = request->call(); // Copy call ORCH_LOG(ORCH_INFO, "invoked task " << request->call().name()); - try { - Call* callptr = &call_; - message_queue mq(open_only, worker_address_.c_str()); - mq.send(&callptr, sizeof(Call*), 0); - } - catch(interprocess_exception &ex){ - message_queue::remove(worker_address_.c_str()); - std::cout << ex.what() << std::endl; - // TODO: return Status; - } + Call* callptr = &call_; + send_queue_.send(&callptr); return Status::OK; } @@ -23,15 +12,7 @@ Worker::Worker(const std::string& worker_address, std::shared_ptr sched : worker_address_(worker_address), scheduler_stub_(Scheduler::NewStub(scheduler_channel)), objstore_stub_(ObjStore::NewStub(objstore_channel)) { - try { - // This creates the receive message queue. - const char* message_queue_name = worker_address_.c_str(); - message_queue::remove(message_queue_name); - receive_queue_ = std::unique_ptr(new message_queue(create_only, message_queue_name, 1, sizeof(Call*))); - } - catch(interprocess_exception &ex) { - std::cout << ex.what() << std::endl; - } + receive_queue_.connect(worker_address_, true); } RemoteCallReply Worker::remote_call(RemoteCallRequest* request) { @@ -120,17 +101,9 @@ void Worker::register_function(const std::string& name, size_t num_return_vals) } Call* Worker::receive_next_task() { - const char* message_queue_name = worker_address_.c_str(); - try { - unsigned int priority; - message_queue::size_type recvd_size; - Call* call; - receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority); - return call; - } - catch(interprocess_exception &ex){ - ORCH_LOG(ORCH_FATAL, ex.what()); - } + Call* call; + receive_queue_.receive(&call); + return call; } void Worker::notify_task_completed() { diff --git a/src/worker.h b/src/worker.h index 3281fb2d1..7e3905275 100644 --- a/src/worker.h +++ b/src/worker.h @@ -6,11 +6,6 @@ #include #include -#include -#include - -using namespace boost::interprocess; - #include using grpc::Server; @@ -29,11 +24,14 @@ using grpc::ClientWriter; class WorkerServiceImpl final : public WorkerService::Service { public: WorkerServiceImpl(const std::string& worker_address) - : worker_address_(worker_address) {} + : worker_address_(worker_address) { + send_queue_.connect(worker_address_, false); + } Status InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) override; private: std::string worker_address_; Call call_; // copy of the current call + MessageQueue send_queue_; }; class Worker { @@ -67,7 +65,7 @@ class Worker { std::unique_ptr scheduler_stub_; std::unique_ptr objstore_stub_; std::thread worker_server_thread_; - std::unique_ptr receive_queue_; + MessageQueue receive_queue_; managed_shared_memory segment_; WorkerId workerid_; std::string worker_address_;