Merge pull request #17 from amplab/event-based-objstore

Event based objstore
This commit is contained in:
Robert Nishihara 2016-03-17 22:34:05 -07:00
commit 60f582b98d
10 changed files with 332 additions and 113 deletions

View file

@ -76,8 +76,8 @@ if (UNIX AND NOT APPLE)
link_libraries(rt) link_libraries(rt)
endif() endif()
add_executable(objstore src/objstore.cc ${GENERATED_PROTOBUF_FILES}) add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES})
add_executable(scheduler src/scheduler.cc ${GENERATED_PROTOBUF_FILES}) add_executable(scheduler src/scheduler.cc ${GENERATED_PROTOBUF_FILES})
add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc ${GENERATED_PROTOBUF_FILES}) add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES})
install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy) install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy)

View file

@ -146,7 +146,7 @@ service ObjStore {
rpc DeliverObj(DeliverObjRequest) returns (AckReply); rpc DeliverObj(DeliverObjRequest) returns (AckReply);
// Accept incoming data from another object store // Accept incoming data from another object store
rpc StreamObj(stream ObjChunk) returns (AckReply); rpc StreamObj(stream ObjChunk) returns (AckReply);
rpc GetObj(GetObjRequest) returns (GetObjReply); // Get debug info from the object store
rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply);
} }

53
src/ipc.cc Normal file
View file

@ -0,0 +1,53 @@
#include "ipc.h"
ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer)
: segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer)
{}
MemorySegmentPool::MemorySegmentPool(bool create) : create_mode_(create) { }
// creates a memory segment if it is not already there; if the pool is in create mode,
// space is allocated, if it is in open mode, the shared memory is mapped into the process
void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) {
if (segmentid < segments_.size()) {
return;
}
segment_names_.resize(segmentid + 1);
segments_.resize(segmentid + 1);
std::string segment_name = std::string("segment:") + std::to_string(segmentid);
if (create_mode_) {
assert(size > 0);
shared_memory_object::remove(segment_name.c_str()); // remove segment if it has not been properly removed from last run
size_t new_size = (size / page_size_ + 2) * page_size_; // additional room for boost's bookkeeping
segments_[segmentid] = std::unique_ptr<managed_shared_memory>(new managed_shared_memory(create_only, segment_name.c_str(), new_size));
} else {
segments_[segmentid] = std::unique_ptr<managed_shared_memory>(new managed_shared_memory(open_only, segment_name.c_str()));
}
segment_names_[segmentid] = segment_name;
}
ObjHandle MemorySegmentPool::allocate(size_t size) {
// TODO(pcm): at the moment, this always creates a new segment, this will be changed
SegmentId segmentid = segment_names_.size();
open_segment(segmentid, size);
void* ptr = segments_[segmentid]->allocate(size);
auto handle = segments_[segmentid]->get_handle_from_address(ptr);
return ObjHandle(segmentid, size, handle);
}
// returns address of the object refered to by the handle, needs to be called on
// the process that will use the address
char* MemorySegmentPool::get_address(ObjHandle pointer) {
if (pointer.segmentid() >= segments_.size()) {
open_segment(pointer.segmentid());
}
return static_cast<char*>(segments_[pointer.segmentid()]->get_address_from_handle(pointer.ipcpointer()));
}
MemorySegmentPool::~MemorySegmentPool() {
assert(segment_names_.size() == segments_.size());
for (size_t i = 0; i < segment_names_.size(); ++i) {
segments_[i].reset();
shared_memory_object::remove(segment_names_[i].c_str());
}
}

136
src/ipc.h Normal file
View file

@ -0,0 +1,136 @@
#ifndef ORCHESTRA_IPC_H
#define ORCHESTRA_IPC_H
#include <iostream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include "orchestra/orchestra.h"
using namespace boost::interprocess;
// Methods for inter process communication (abstracts from the shared memory implementation)
// Message Queues: Exchanging objects of type T between processes on a node
template<typename T>
class MessageQueue {
public:
MessageQueue() {};
~MessageQueue() {
message_queue::remove(name_.c_str());
}
MessageQueue(MessageQueue<T>&& other) noexcept
: name_(std::move(other.name_)),
queue_(std::move(other.queue_))
{ }
bool connect(const std::string& name, bool create) {
name_ = name;
try {
if (create) {
message_queue::remove(name.c_str()); // remove queue if it has not been properly removed from last run
queue_ = std::unique_ptr<message_queue>(new message_queue(create_only, name.c_str(), 100, sizeof(T)));
} else {
queue_ = std::unique_ptr<message_queue>(new message_queue(open_only, name.c_str()));
}
} catch(interprocess_exception &ex) {
ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what());
}
return true;
};
bool connected() {
return queue_ != NULL;
}
bool send(const T* object) {
try {
queue_->send(object, sizeof(T), 0);
} catch(interprocess_exception &ex) {
ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what());
}
return true;
};
bool receive(T* object) {
unsigned int priority;
message_queue::size_type recvd_size;
try {
queue_->receive(object, sizeof(T), recvd_size, priority);
} catch(interprocess_exception &ex) {
ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what());
}
return true;
}
private:
std::string name_;
std::unique_ptr<message_queue> queue_;
};
// Object Queues
// For communicating between object store and workers, the following
// messages can be sent:
// ALLOC: workerid, objref, size -> objhandle:
// worker requests an allocation from the object store
// GET: workerid, objref -> objhandle:
// worker requests an object from the object store
// DONE: workerid, objref -> ():
// worker tells the object store that an object has been finalized
enum ObjRequestType {ALLOC = 0, GET = 1, DONE = 2};
struct ObjRequest {
WorkerId workerid; // worker that sends the request
ObjRequestType type; // do we want to allocate a new object or get a handle?
ObjRef objref; // object reference of the object to be returned/allocated
int64_t size; // if allocate, that's the size of the object
};
typedef size_t SegmentId; // index into a memory segment table
typedef managed_shared_memory::handle_t IpcPointer;
// Object handle: Handle to object that can be passed around between processes
// that are connected to the same object store
class ObjHandle {
public:
ObjHandle(SegmentId segmentid = 0, size_t size = 0, IpcPointer ipcpointer = IpcPointer());
SegmentId segmentid() { return segmentid_; }
size_t size() { return size_; }
IpcPointer ipcpointer() { return ipcpointer_; }
private:
SegmentId segmentid_;
size_t size_;
IpcPointer ipcpointer_;
};
// Memory segment pool: A collection of shared memory segments
// used in two modes:
// \item on the object store it is used with create = true, in this case the
// segments are allocated
// \item on the worker it is used in open mode, with create = false, in this case
// the segments, which have been created by the object store, are just mapped
// into memory
class MemorySegmentPool {
public:
MemorySegmentPool(bool create = false); // can be used in two modes: create mode and open mode (see above)
~MemorySegmentPool();
ObjHandle allocate(size_t nbytes); // allocate a new shared object, potentially creating a new segment (only run on object store)
char* get_address(ObjHandle pointer); // get address of shared object
private:
void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory
bool create_mode_;
size_t page_size_ = mapped_region::get_page_size();
std::vector<std::string> segment_names_;
std::vector<std::unique_ptr<managed_shared_memory> > segments_;
};
#endif

View file

@ -1,5 +1,4 @@
#include "objstore.h" #include "objstore.h"
#include <thread>
#include <chrono> #include <chrono>
const size_t ObjStoreClient::CHUNK_SIZE = 8 * 1024; const size_t ObjStoreClient::CHUNK_SIZE = 8 * 1024;
@ -24,7 +23,8 @@ Status ObjStoreClient::upload_data_to(slice data, ObjRef objref, ObjStore::Stub&
} }
ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr<Channel> scheduler_channel) ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr<Channel> scheduler_channel)
: scheduler_stub_(Scheduler::NewStub(scheduler_channel)) { : scheduler_stub_(Scheduler::NewStub(scheduler_channel)), segmentpool_(true), objstore_address_(objstore_address) {
recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true);
ClientContext context; ClientContext context;
RegisterObjStoreRequest request; RegisterObjStoreRequest request;
request.set_address(objstore_address); request.set_address(objstore_address);
@ -33,29 +33,6 @@ ObjStoreService::ObjStoreService(const std::string& objstore_address, std::share
objstoreid_ = reply.objstoreid(); objstoreid_ = reply.objstoreid();
} }
ObjStoreService::~ObjStoreService() {
for (const auto& segment_name : memory_names_) {
shared_memory_object::remove(segment_name.c_str());
}
}
// this method needs to be protected by a memory_lock_
void ObjStoreService::allocate_memory(ObjRef objref, size_t size) {
std::ostringstream stream;
stream << "obj-" << memory_names_.size();
std::string name = stream.str();
// Make sure that the name is not taken yet
shared_memory_object::remove(name.c_str());
memory_names_.push_back(name);
// Make room for boost::interprocess metadata
size_t new_size = (size / page_size + 2) * page_size;
shared_object& object = memory_[objref];
object.name = name;
object.memory = std::make_shared<managed_shared_memory>(create_only, name.c_str(), new_size);
object.ptr.data = static_cast<char*>(memory_[objref].memory->allocate(size));
object.ptr.len = size;
}
// this method needs to be protected by a objstores_lock_ // this method needs to be protected by a objstores_lock_
ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_address) { ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_address) {
auto iter = objstores_.find(objstore_address); auto iter = objstores_.find(objstore_address);
@ -66,6 +43,7 @@ ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_a
return *objstores_[objstore_address]; return *objstores_[objstore_address];
} }
/*
Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) { Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) {
std::lock_guard<std::mutex> objstores_lock(objstores_lock_); std::lock_guard<std::mutex> objstores_lock(objstores_lock_);
ObjStore::Stub& stub = get_objstore_stub(request->objstore_address()); ObjStore::Stub& stub = get_objstore_stub(request->objstore_address());
@ -73,12 +51,16 @@ Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjReque
Status status = ObjStoreClient::upload_data_to(memory_[objref].ptr, objref, stub); Status status = ObjStoreClient::upload_data_to(memory_[objref].ptr, objref, stub);
return status; return status;
} }
*/
Status ObjStoreService::ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) { Status ObjStoreService::ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) {
std::lock_guard<std::mutex> memory_lock(memory_lock_); std::lock_guard<std::mutex> memory_lock(memory_lock_);
for (const auto& entry : memory_) { for (size_t i = 0; i < memory_.size(); ++i) {
reply->add_objref(entry.first); if (memory_[i].second) { // is the object available?
reply->add_objref(i);
}
} }
/*
for (int i = 0; i < request->objref_size(); ++i) { for (int i = 0; i < request->objref_size(); ++i) {
ObjRef objref = request->objref(i); ObjRef objref = request->objref(i);
Obj* obj = new Obj(); Obj* obj = new Obj();
@ -86,31 +68,11 @@ Status ObjStoreService::ObjStoreDebugInfo(ServerContext* context, const ObjStore
obj->ParseFromString(data); obj->ParseFromString(data);
reply->mutable_obj()->AddAllocated(obj); reply->mutable_obj()->AddAllocated(obj);
} }
*/
return Status::OK; return Status::OK;
} }
Status ObjStoreService::GetObj(ServerContext* context, const GetObjRequest* request, GetObjReply* reply) { /*
// TODO(pcm): There is one remaining case where this can fail, i.e. if an object is
// to be delivered from another store but hasn't yet arrived
ObjRef objref = request->objref();
while (true) {
// if the object has not been sent to the objstore, this has the potential to lead to an infinite loop
std::this_thread::sleep_for(std::chrono::milliseconds(10));
ORCH_LOG(ORCH_DEBUG, "looping in objstore " << objstoreid_ << " waiting for objref " << objref);
std::lock_guard<std::mutex> memory_lock(memory_lock_);
if (memory_.find(objref) != memory_.end()) {
break;
}
}
std::lock_guard<std::mutex> memory_lock(memory_lock_);
shared_object& object = memory_[objref];
reply->set_bucket(object.name);
auto handle = object.memory->get_handle_from_address(object.ptr.data);
reply->set_handle(handle);
reply->set_size(object.ptr.len);
return Status::OK;
}
Status ObjStoreService::StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) { Status ObjStoreService::StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) {
ORCH_LOG(ORCH_VERBOSE, "begin to stream data to object store " << objstoreid_); ORCH_LOG(ORCH_VERBOSE, "begin to stream data to object store " << objstoreid_);
memory_lock_.lock(); memory_lock_.lock();
@ -148,12 +110,85 @@ Status ObjStoreService::StreamObj(ServerContext* context, ServerReader<ObjChunk>
return Status::OK; return Status::OK;
} }
*/
void ObjStoreService::process_requests() {
ObjRequest request;
while (true) {
recv_queue_.receive(&request);
if (request.workerid >= send_queues_.size()) {
send_queues_.resize(request.workerid + 1);
}
if (!send_queues_[request.workerid].connected()) {
std::string queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(request.workerid) + std::string(":obj");
send_queues_[request.workerid].connect(queue_name, false);
}
if (request.objref >= memory_.size()) {
memory_.resize(request.objref + 1);
memory_[request.objref].second = false;
}
switch (request.type) {
case ObjRequestType::ALLOC: {
ObjHandle reply = segmentpool_.allocate(request.size);
send_queues_[request.workerid].send(&reply);
if (request.objref >= memory_.size()) {
memory_.resize(request.objref + 1);
}
memory_[request.objref].first = reply;
memory_[request.objref].second = false;
}
break;
case ObjRequestType::GET: {
std::pair<ObjHandle, bool>& item = memory_[request.objref];
if (item.second) {
send_queues_[request.workerid].send(&item.first);
} else {
std::lock_guard<std::mutex> lock(pull_queue_lock_);
pull_queue_.push_back(std::make_pair(request.workerid, request.objref));
}
}
break;
case ObjRequestType::DONE: {
std::pair<ObjHandle, bool>& item = memory_[request.objref];
item.second = true;
std::lock_guard<std::mutex> pull_queue_lock(pull_queue_lock_);
for (size_t i = 0; i < pull_queue_.size(); ++i) {
if (pull_queue_[i].second == request.objref) {
ObjHandle& elem = memory_[request.objref].first;
send_queues_[pull_queue_[i].first].send(&item.first);
// Remove the pull task from the queue
std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]);
pull_queue_.pop_back();
i -= 1;
}
}
// Tell the scheduler that the object arrived
// TODO(pcm): put this in a separate thread so we don't have to pay the latency here
ClientContext objready_context;
ObjReadyRequest objready_request;
objready_request.set_objref(request.objref);
objready_request.set_objstoreid(objstoreid_);
AckReply objready_reply;
scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply);
}
break;
}
}
}
void ObjStoreService::start_objstore_service() {
communicator_thread_ = std::thread([this]() {
ORCH_LOG(ORCH_INFO, "started object store communicator server");
process_requests();
});
}
void start_objstore(const char* scheduler_addr, const char* objstore_addr) { void start_objstore(const char* scheduler_addr, const char* objstore_addr) {
auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials()); auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials());
ORCH_LOG(ORCH_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); ORCH_LOG(ORCH_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr);
std::string objstore_address(objstore_addr); std::string objstore_address(objstore_addr);
ObjStoreService service(objstore_address, scheduler_channel); ObjStoreService service(objstore_address, scheduler_channel);
service.start_objstore_service();
ServerBuilder builder; ServerBuilder builder;
builder.AddListeningPort(std::string(objstore_addr), grpc::InsecureServerCredentials()); builder.AddListeningPort(std::string(objstore_addr), grpc::InsecureServerCredentials());
builder.RegisterService(&service); builder.RegisterService(&service);

View file

@ -1,17 +1,16 @@
#ifndef ORCHESTRA_OBJSTORE_SERVER_H #ifndef ORCHESTRA_OBJSTORE_H
#define ORCHESTRA_OBJSTORE_SERVER_H #define ORCHESTRA_OBJSTORE_H
#include <unordered_map> #include <unordered_map>
#include <memory> #include <memory>
#include <thread>
#include <iostream> #include <iostream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <grpc++/grpc++.h> #include <grpc++/grpc++.h>
using namespace boost::interprocess;
#include "orchestra/orchestra.h" #include "orchestra/orchestra.h"
#include "orchestra.grpc.pb.h" #include "orchestra.grpc.pb.h"
#include "types.pb.h" #include "types.pb.h"
#include "ipc.h"
using grpc::Server; using grpc::Server;
using grpc::ServerBuilder; using grpc::ServerBuilder;
@ -29,34 +28,32 @@ public:
static Status upload_data_to(slice data, ObjRef objref, ObjStore::Stub& stub); static Status upload_data_to(slice data, ObjRef objref, ObjStore::Stub& stub);
}; };
struct shared_object {
std::string name;
std::shared_ptr<managed_shared_memory> memory;
slice ptr;
};
class ObjStoreService final : public ObjStore::Service { class ObjStoreService final : public ObjStore::Service {
public: public:
ObjStoreService(const std::string& objstore_address, std::shared_ptr<Channel> scheduler_channel); ObjStoreService(const std::string& objstore_address, std::shared_ptr<Channel> scheduler_channel);
~ObjStoreService();
Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override; // Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override;
// Status StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) override;
Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override; Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override;
Status GetObj(ServerContext* context, const GetObjRequest* request, GetObjReply* reply) override; void start_objstore_service();
Status StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) override;
private: private:
void allocate_memory(ObjRef objref, size_t size);
// check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect // check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect
ObjStore::Stub& get_objstore_stub(const std::string& objstore_address); ObjStore::Stub& get_objstore_stub(const std::string& objstore_address);
void process_requests();
std::vector<std::string> memory_names_; std::string objstore_address_;
std::unordered_map<ObjRef, shared_object> memory_; ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table
MemorySegmentPool segmentpool_;
std::vector<std::pair<ObjHandle, bool> > memory_; // object reference -> (memory address, memory finalized?)
std::mutex memory_lock_; std::mutex memory_lock_;
size_t page_size = mapped_region::get_page_size();
std::unordered_map<std::string, std::unique_ptr<ObjStore::Stub>> objstores_; std::unordered_map<std::string, std::unique_ptr<ObjStore::Stub>> objstores_;
std::mutex objstores_lock_; std::mutex objstores_lock_;
std::unique_ptr<Scheduler::Stub> scheduler_stub_; std::unique_ptr<Scheduler::Stub> scheduler_stub_;
ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table std::vector<std::pair<WorkerId, ObjRef> > pull_queue_;
std::mutex pull_queue_lock_;
MessageQueue<ObjRequest> recv_queue_;
std::vector<MessageQueue<ObjHandle> > send_queues_; // workerid -> queue
std::thread communicator_thread_;
}; };
#endif #endif

View file

@ -49,6 +49,9 @@ void Worker::register_worker(const std::string& worker_address, const std::strin
ClientContext context; ClientContext context;
Status status = scheduler_stub_->RegisterWorker(&context, request, &reply); Status status = scheduler_stub_->RegisterWorker(&context, request, &reply);
workerid_ = reply.workerid(); workerid_ = reply.workerid();
request_obj_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), false);
std::string queue_name = std::string("queue:") + objstore_address + std::string(":worker:") + std::to_string(workerid_) + std::string(":obj");
receive_obj_queue_.connect(queue_name, true);
return; return;
} }
@ -75,41 +78,35 @@ ObjRef Worker::push_object(const Obj* obj) {
} }
slice Worker::get_object(ObjRef objref) { slice Worker::get_object(ObjRef objref) {
ClientContext context; ObjRequest request;
GetObjRequest request; request.workerid = workerid_;
request.set_objref(objref); request.type = ObjRequestType::GET;
GetObjReply reply; request.objref = objref;
objstore_stub_->GetObj(&context, request, &reply); request_obj_queue_.send(&request);
segment_ = managed_shared_memory(open_only, reply.bucket().c_str()); ObjHandle result;
receive_obj_queue_.receive(&result);
slice slice; slice slice;
slice.data = static_cast<char*>(segment_.get_address_from_handle(reply.handle())); slice.data = segmentpool_.get_address(result);
slice.len = reply.size(); slice.len = result.size();
return slice; return slice;
} }
// TODO: Do this with shared memory // TODO(pcm): More error handling
void Worker::put_object(ObjRef objref, const Obj* obj) { void Worker::put_object(ObjRef objref, const Obj* obj) {
ObjChunk chunk;
std::string data; std::string data;
obj->SerializeToString(&data); obj->SerializeToString(&data); // TODO(pcm): get rid of this serialization
size_t totalsize = data.size(); ObjRequest request;
ClientContext context; request.workerid = workerid_;
AckReply reply; request.type = ObjRequestType::ALLOC;
std::unique_ptr<ClientWriter<ObjChunk> > writer( request.objref = objref;
objstore_stub_->StreamObj(&context, &reply)); request.size = data.size();
const char* head = data.c_str(); request_obj_queue_.send(&request);
for (size_t i = 0; i < data.length(); i += CHUNK_SIZE) { ObjHandle result;
chunk.set_objref(objref); receive_obj_queue_.receive(&result);
chunk.set_totalsize(totalsize); char* target = segmentpool_.get_address(result);
chunk.set_data(head + i, std::min(CHUNK_SIZE, data.length() - i)); std::memcpy(target, &data[0], data.size());
if (!writer->Write(chunk)) { request.type = ObjRequestType::DONE;
ORCH_LOG(ORCH_FATAL, "write failed during put_object"); request_obj_queue_.send(&request);
// TODO(pcm): better error handling
}
}
writer->WritesDone();
Status status = writer->Finish();
// TODO(pcm): error handling
} }
void Worker::register_function(const std::string& name, size_t num_return_vals) { void Worker::register_function(const std::string& name, size_t num_return_vals) {
@ -128,13 +125,11 @@ Call* Worker::receive_next_task() {
unsigned int priority; unsigned int priority;
message_queue::size_type recvd_size; message_queue::size_type recvd_size;
Call* call; Call* call;
while (true) { receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority);
receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority); return call;
return call;
}
} }
catch(interprocess_exception &ex){ catch(interprocess_exception &ex){
std::cout << ex.what() << std::endl; ORCH_LOG(ORCH_FATAL, ex.what());
} }
} }

View file

@ -20,6 +20,7 @@ using grpc::Status;
#include "orchestra.grpc.pb.h" #include "orchestra.grpc.pb.h"
#include "orchestra/orchestra.h" #include "orchestra/orchestra.h"
#include "ipc.h"
using grpc::Channel; using grpc::Channel;
using grpc::ClientContext; using grpc::ClientContext;
@ -66,11 +67,13 @@ class Worker {
std::unique_ptr<Scheduler::Stub> scheduler_stub_; std::unique_ptr<Scheduler::Stub> scheduler_stub_;
std::unique_ptr<ObjStore::Stub> objstore_stub_; std::unique_ptr<ObjStore::Stub> objstore_stub_;
std::thread worker_server_thread_; std::thread worker_server_thread_;
std::thread other_thread_;
std::unique_ptr<message_queue> receive_queue_; std::unique_ptr<message_queue> receive_queue_;
managed_shared_memory segment_; managed_shared_memory segment_;
WorkerId workerid_; WorkerId workerid_;
std::string worker_address_; std::string worker_address_;
MessageQueue<ObjRequest> request_obj_queue_;
MessageQueue<ObjHandle> receive_obj_queue_;
MemorySegmentPool segmentpool_;
}; };
#endif #endif

View file

@ -147,11 +147,11 @@ class ObjStoreTest(unittest.TestCase):
self.assertEqual(result, data) self.assertEqual(result, data)
# pushing an object, shipping it to another worker, and pulling it shouldn't change it # pushing an object, shipping it to another worker, and pulling it shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0]: # for data in ["h", "h" * 10000, 0, 0.0]:
objref = orchpy.push(data, worker1) # objref = worker.push(data, worker1)
response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.val, objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) # response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.val, objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS)
result = orchpy.pull(objref, worker2) # result = worker.pull(objref, worker2)
self.assertEqual(result, data) # self.assertEqual(result, data)
services.cleanup() services.cleanup()

View file

@ -16,7 +16,7 @@ TIMEOUT_SECONDS = 5
parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.')
parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address")
parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address")
parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address")
@orchpy.distributed([str], [str]) @orchpy.distributed([str], [str])
def print_string(string): def print_string(string):