Merge pull request #18 from amplab/msgqueue-fix

clean up queues
This commit is contained in:
Robert Nishihara 2016-03-17 23:04:25 -07:00
commit fae48b5744
2 changed files with 11 additions and 40 deletions

View file

@ -1,21 +1,10 @@
# include "worker.h"
Status WorkerServiceImpl::InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) {
// TODO(rkn): This method opens a message_queue, which may consume a
// filehandle. This should be changed to only open a queue once in the
// constructor.
call_ = request->call(); // Copy call
ORCH_LOG(ORCH_INFO, "invoked task " << request->call().name());
try {
Call* callptr = &call_;
message_queue mq(open_only, worker_address_.c_str());
mq.send(&callptr, sizeof(Call*), 0);
}
catch(interprocess_exception &ex){
message_queue::remove(worker_address_.c_str());
std::cout << ex.what() << std::endl;
// TODO: return Status;
}
Call* callptr = &call_;
send_queue_.send(&callptr);
return Status::OK;
}
@ -23,15 +12,7 @@ Worker::Worker(const std::string& worker_address, std::shared_ptr<Channel> sched
: worker_address_(worker_address),
scheduler_stub_(Scheduler::NewStub(scheduler_channel)),
objstore_stub_(ObjStore::NewStub(objstore_channel)) {
try {
// This creates the receive message queue.
const char* message_queue_name = worker_address_.c_str();
message_queue::remove(message_queue_name);
receive_queue_ = std::unique_ptr<message_queue>(new message_queue(create_only, message_queue_name, 1, sizeof(Call*)));
}
catch(interprocess_exception &ex) {
std::cout << ex.what() << std::endl;
}
receive_queue_.connect(worker_address_, true);
}
RemoteCallReply Worker::remote_call(RemoteCallRequest* request) {
@ -120,17 +101,9 @@ void Worker::register_function(const std::string& name, size_t num_return_vals)
}
Call* Worker::receive_next_task() {
const char* message_queue_name = worker_address_.c_str();
try {
unsigned int priority;
message_queue::size_type recvd_size;
Call* call;
receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority);
return call;
}
catch(interprocess_exception &ex){
ORCH_LOG(ORCH_FATAL, ex.what());
}
Call* call;
receive_queue_.receive(&call);
return call;
}
void Worker::notify_task_completed() {

View file

@ -6,11 +6,6 @@
#include <string>
#include <thread>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
using namespace boost::interprocess;
#include <grpc++/grpc++.h>
using grpc::Server;
@ -29,11 +24,14 @@ using grpc::ClientWriter;
class WorkerServiceImpl final : public WorkerService::Service {
public:
WorkerServiceImpl(const std::string& worker_address)
: worker_address_(worker_address) {}
: worker_address_(worker_address) {
send_queue_.connect(worker_address_, false);
}
Status InvokeCall(ServerContext* context, const InvokeCallRequest* request, InvokeCallReply* reply) override;
private:
std::string worker_address_;
Call call_; // copy of the current call
MessageQueue<Call*> send_queue_;
};
class Worker {
@ -67,7 +65,7 @@ class Worker {
std::unique_ptr<Scheduler::Stub> scheduler_stub_;
std::unique_ptr<ObjStore::Stub> objstore_stub_;
std::thread worker_server_thread_;
std::unique_ptr<message_queue> receive_queue_;
MessageQueue<Call*> receive_queue_;
managed_shared_memory segment_;
WorkerId workerid_;
std::string worker_address_;