diff --git a/CMakeLists.txt b/CMakeLists.txt index 0650f2a8d..258226855 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,8 +76,8 @@ if (UNIX AND NOT APPLE) link_libraries(rt) endif() -add_executable(objstore src/objstore.cc ${GENERATED_PROTOBUF_FILES}) +add_executable(objstore src/objstore.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) add_executable(scheduler src/scheduler.cc ${GENERATED_PROTOBUF_FILES}) -add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc ${GENERATED_PROTOBUF_FILES}) +add_library(orchpylib SHARED src/orchpylib.cc src/worker.cc src/ipc.cc ${GENERATED_PROTOBUF_FILES}) install(TARGETS objstore scheduler orchpylib DESTINATION ${CMAKE_SOURCE_DIR}/lib/orchpy/orchpy) diff --git a/protos/orchestra.proto b/protos/orchestra.proto index f31673275..638a7c9a5 100644 --- a/protos/orchestra.proto +++ b/protos/orchestra.proto @@ -146,7 +146,7 @@ service ObjStore { rpc DeliverObj(DeliverObjRequest) returns (AckReply); // Accept incoming data from another object store rpc StreamObj(stream ObjChunk) returns (AckReply); - rpc GetObj(GetObjRequest) returns (GetObjReply); + // Get debug info from the object store rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); } diff --git a/src/ipc.cc b/src/ipc.cc new file mode 100644 index 000000000..a46030a42 --- /dev/null +++ b/src/ipc.cc @@ -0,0 +1,53 @@ +#include "ipc.h" + +ObjHandle::ObjHandle(SegmentId segmentid, size_t size, IpcPointer ipcpointer) + : segmentid_(segmentid), size_(size), ipcpointer_(ipcpointer) +{} + +MemorySegmentPool::MemorySegmentPool(bool create) : create_mode_(create) { } + +// creates a memory segment if it is not already there; if the pool is in create mode, +// space is allocated, if it is in open mode, the shared memory is mapped into the process +void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) { + if (segmentid < segments_.size()) { + return; + } + segment_names_.resize(segmentid + 1); + segments_.resize(segmentid + 1); + std::string segment_name = std::string("segment:") + std::to_string(segmentid); + if (create_mode_) { + assert(size > 0); + shared_memory_object::remove(segment_name.c_str()); // remove segment if it has not been properly removed from last run + size_t new_size = (size / page_size_ + 2) * page_size_; // additional room for boost's bookkeeping + segments_[segmentid] = std::unique_ptr(new managed_shared_memory(create_only, segment_name.c_str(), new_size)); + } else { + segments_[segmentid] = std::unique_ptr(new managed_shared_memory(open_only, segment_name.c_str())); + } + segment_names_[segmentid] = segment_name; +} + +ObjHandle MemorySegmentPool::allocate(size_t size) { + // TODO(pcm): at the moment, this always creates a new segment, this will be changed + SegmentId segmentid = segment_names_.size(); + open_segment(segmentid, size); + void* ptr = segments_[segmentid]->allocate(size); + auto handle = segments_[segmentid]->get_handle_from_address(ptr); + return ObjHandle(segmentid, size, handle); +} + +// returns address of the object refered to by the handle, needs to be called on +// the process that will use the address +char* MemorySegmentPool::get_address(ObjHandle pointer) { + if (pointer.segmentid() >= segments_.size()) { + open_segment(pointer.segmentid()); + } + return static_cast(segments_[pointer.segmentid()]->get_address_from_handle(pointer.ipcpointer())); +} + +MemorySegmentPool::~MemorySegmentPool() { + assert(segment_names_.size() == segments_.size()); + for (size_t i = 0; i < segment_names_.size(); ++i) { + segments_[i].reset(); + shared_memory_object::remove(segment_names_[i].c_str()); + } +} diff --git a/src/ipc.h b/src/ipc.h new file mode 100644 index 000000000..9e2e764f2 --- /dev/null +++ b/src/ipc.h @@ -0,0 +1,136 @@ +#ifndef ORCHESTRA_IPC_H +#define ORCHESTRA_IPC_H + +#include + +#include +#include + +#include "orchestra/orchestra.h" + +using namespace boost::interprocess; + +// Methods for inter process communication (abstracts from the shared memory implementation) + +// Message Queues: Exchanging objects of type T between processes on a node + +template +class MessageQueue { +public: + MessageQueue() {}; + + ~MessageQueue() { + message_queue::remove(name_.c_str()); + } + + MessageQueue(MessageQueue&& other) noexcept + : name_(std::move(other.name_)), + queue_(std::move(other.queue_)) + { } + + bool connect(const std::string& name, bool create) { + name_ = name; + try { + if (create) { + message_queue::remove(name.c_str()); // remove queue if it has not been properly removed from last run + queue_ = std::unique_ptr(new message_queue(create_only, name.c_str(), 100, sizeof(T))); + } else { + queue_ = std::unique_ptr(new message_queue(open_only, name.c_str())); + } + } catch(interprocess_exception &ex) { + ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + } + return true; + }; + + bool connected() { + return queue_ != NULL; + } + + bool send(const T* object) { + try { + queue_->send(object, sizeof(T), 0); + } catch(interprocess_exception &ex) { + ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + } + return true; + }; + + bool receive(T* object) { + unsigned int priority; + message_queue::size_type recvd_size; + try { + queue_->receive(object, sizeof(T), recvd_size, priority); + } catch(interprocess_exception &ex) { + ORCH_LOG(ORCH_FATAL, "boost::interprocess exception: " << ex.what()); + } + return true; + } + +private: + std::string name_; + std::unique_ptr queue_; +}; + +// Object Queues + +// For communicating between object store and workers, the following +// messages can be sent: + +// ALLOC: workerid, objref, size -> objhandle: +// worker requests an allocation from the object store +// GET: workerid, objref -> objhandle: +// worker requests an object from the object store +// DONE: workerid, objref -> (): +// worker tells the object store that an object has been finalized + +enum ObjRequestType {ALLOC = 0, GET = 1, DONE = 2}; + +struct ObjRequest { + WorkerId workerid; // worker that sends the request + ObjRequestType type; // do we want to allocate a new object or get a handle? + ObjRef objref; // object reference of the object to be returned/allocated + int64_t size; // if allocate, that's the size of the object +}; + +typedef size_t SegmentId; // index into a memory segment table +typedef managed_shared_memory::handle_t IpcPointer; + +// Object handle: Handle to object that can be passed around between processes +// that are connected to the same object store + +class ObjHandle { +public: + ObjHandle(SegmentId segmentid = 0, size_t size = 0, IpcPointer ipcpointer = IpcPointer()); + SegmentId segmentid() { return segmentid_; } + size_t size() { return size_; } + IpcPointer ipcpointer() { return ipcpointer_; } +private: + SegmentId segmentid_; + size_t size_; + IpcPointer ipcpointer_; +}; + +// Memory segment pool: A collection of shared memory segments +// used in two modes: +// \item on the object store it is used with create = true, in this case the +// segments are allocated +// \item on the worker it is used in open mode, with create = false, in this case +// the segments, which have been created by the object store, are just mapped +// into memory + +class MemorySegmentPool { +public: + MemorySegmentPool(bool create = false); // can be used in two modes: create mode and open mode (see above) + ~MemorySegmentPool(); + ObjHandle allocate(size_t nbytes); // allocate a new shared object, potentially creating a new segment (only run on object store) + char* get_address(ObjHandle pointer); // get address of shared object +private: + void open_segment(SegmentId segmentid, size_t size = 0); // create a segment or map an existing one into memory + bool create_mode_; + size_t page_size_ = mapped_region::get_page_size(); + std::vector segment_names_; + std::vector > segments_; +}; + +#endif diff --git a/src/objstore.cc b/src/objstore.cc index 17aa3004d..635c93278 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -1,5 +1,4 @@ #include "objstore.h" -#include #include const size_t ObjStoreClient::CHUNK_SIZE = 8 * 1024; @@ -24,7 +23,8 @@ Status ObjStoreClient::upload_data_to(slice data, ObjRef objref, ObjStore::Stub& } ObjStoreService::ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel) - : scheduler_stub_(Scheduler::NewStub(scheduler_channel)) { + : scheduler_stub_(Scheduler::NewStub(scheduler_channel)), segmentpool_(true), objstore_address_(objstore_address) { + recv_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), true); ClientContext context; RegisterObjStoreRequest request; request.set_address(objstore_address); @@ -33,29 +33,6 @@ ObjStoreService::ObjStoreService(const std::string& objstore_address, std::share objstoreid_ = reply.objstoreid(); } -ObjStoreService::~ObjStoreService() { - for (const auto& segment_name : memory_names_) { - shared_memory_object::remove(segment_name.c_str()); - } -} - -// this method needs to be protected by a memory_lock_ -void ObjStoreService::allocate_memory(ObjRef objref, size_t size) { - std::ostringstream stream; - stream << "obj-" << memory_names_.size(); - std::string name = stream.str(); - // Make sure that the name is not taken yet - shared_memory_object::remove(name.c_str()); - memory_names_.push_back(name); - // Make room for boost::interprocess metadata - size_t new_size = (size / page_size + 2) * page_size; - shared_object& object = memory_[objref]; - object.name = name; - object.memory = std::make_shared(create_only, name.c_str(), new_size); - object.ptr.data = static_cast(memory_[objref].memory->allocate(size)); - object.ptr.len = size; -} - // this method needs to be protected by a objstores_lock_ ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_address) { auto iter = objstores_.find(objstore_address); @@ -66,6 +43,7 @@ ObjStore::Stub& ObjStoreService::get_objstore_stub(const std::string& objstore_a return *objstores_[objstore_address]; } +/* Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) { std::lock_guard objstores_lock(objstores_lock_); ObjStore::Stub& stub = get_objstore_stub(request->objstore_address()); @@ -73,12 +51,16 @@ Status ObjStoreService::DeliverObj(ServerContext* context, const DeliverObjReque Status status = ObjStoreClient::upload_data_to(memory_[objref].ptr, objref, stub); return status; } +*/ Status ObjStoreService::ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) { std::lock_guard memory_lock(memory_lock_); - for (const auto& entry : memory_) { - reply->add_objref(entry.first); + for (size_t i = 0; i < memory_.size(); ++i) { + if (memory_[i].second) { // is the object available? + reply->add_objref(i); + } } + /* for (int i = 0; i < request->objref_size(); ++i) { ObjRef objref = request->objref(i); Obj* obj = new Obj(); @@ -86,31 +68,11 @@ Status ObjStoreService::ObjStoreDebugInfo(ServerContext* context, const ObjStore obj->ParseFromString(data); reply->mutable_obj()->AddAllocated(obj); } + */ return Status::OK; } -Status ObjStoreService::GetObj(ServerContext* context, const GetObjRequest* request, GetObjReply* reply) { - // TODO(pcm): There is one remaining case where this can fail, i.e. if an object is - // to be delivered from another store but hasn't yet arrived - ObjRef objref = request->objref(); - while (true) { - // if the object has not been sent to the objstore, this has the potential to lead to an infinite loop - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - ORCH_LOG(ORCH_DEBUG, "looping in objstore " << objstoreid_ << " waiting for objref " << objref); - std::lock_guard memory_lock(memory_lock_); - if (memory_.find(objref) != memory_.end()) { - break; - } - } - std::lock_guard memory_lock(memory_lock_); - shared_object& object = memory_[objref]; - reply->set_bucket(object.name); - auto handle = object.memory->get_handle_from_address(object.ptr.data); - reply->set_handle(handle); - reply->set_size(object.ptr.len); - return Status::OK; -} - +/* Status ObjStoreService::StreamObj(ServerContext* context, ServerReader* reader, AckReply* reply) { ORCH_LOG(ORCH_VERBOSE, "begin to stream data to object store " << objstoreid_); memory_lock_.lock(); @@ -148,12 +110,85 @@ Status ObjStoreService::StreamObj(ServerContext* context, ServerReader return Status::OK; } +*/ + +void ObjStoreService::process_requests() { + ObjRequest request; + while (true) { + recv_queue_.receive(&request); + if (request.workerid >= send_queues_.size()) { + send_queues_.resize(request.workerid + 1); + } + if (!send_queues_[request.workerid].connected()) { + std::string queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(request.workerid) + std::string(":obj"); + send_queues_[request.workerid].connect(queue_name, false); + } + if (request.objref >= memory_.size()) { + memory_.resize(request.objref + 1); + memory_[request.objref].second = false; + } + switch (request.type) { + case ObjRequestType::ALLOC: { + ObjHandle reply = segmentpool_.allocate(request.size); + send_queues_[request.workerid].send(&reply); + if (request.objref >= memory_.size()) { + memory_.resize(request.objref + 1); + } + memory_[request.objref].first = reply; + memory_[request.objref].second = false; + } + break; + case ObjRequestType::GET: { + std::pair& item = memory_[request.objref]; + if (item.second) { + send_queues_[request.workerid].send(&item.first); + } else { + std::lock_guard lock(pull_queue_lock_); + pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); + } + } + break; + case ObjRequestType::DONE: { + std::pair& item = memory_[request.objref]; + item.second = true; + std::lock_guard pull_queue_lock(pull_queue_lock_); + for (size_t i = 0; i < pull_queue_.size(); ++i) { + if (pull_queue_[i].second == request.objref) { + ObjHandle& elem = memory_[request.objref].first; + send_queues_[pull_queue_[i].first].send(&item.first); + // Remove the pull task from the queue + std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); + pull_queue_.pop_back(); + i -= 1; + } + } + // Tell the scheduler that the object arrived + // TODO(pcm): put this in a separate thread so we don't have to pay the latency here + ClientContext objready_context; + ObjReadyRequest objready_request; + objready_request.set_objref(request.objref); + objready_request.set_objstoreid(objstoreid_); + AckReply objready_reply; + scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); + } + break; + } + } +} + +void ObjStoreService::start_objstore_service() { + communicator_thread_ = std::thread([this]() { + ORCH_LOG(ORCH_INFO, "started object store communicator server"); + process_requests(); + }); +} void start_objstore(const char* scheduler_addr, const char* objstore_addr) { auto scheduler_channel = grpc::CreateChannel(scheduler_addr, grpc::InsecureChannelCredentials()); ORCH_LOG(ORCH_INFO, "object store " << objstore_addr << " connected to scheduler " << scheduler_addr); std::string objstore_address(objstore_addr); ObjStoreService service(objstore_address, scheduler_channel); + service.start_objstore_service(); ServerBuilder builder; builder.AddListeningPort(std::string(objstore_addr), grpc::InsecureServerCredentials()); builder.RegisterService(&service); diff --git a/src/objstore.h b/src/objstore.h index c45eb7f5c..27ba69df3 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -1,17 +1,16 @@ -#ifndef ORCHESTRA_OBJSTORE_SERVER_H -#define ORCHESTRA_OBJSTORE_SERVER_H +#ifndef ORCHESTRA_OBJSTORE_H +#define ORCHESTRA_OBJSTORE_H #include #include +#include #include -#include #include -using namespace boost::interprocess; - #include "orchestra/orchestra.h" #include "orchestra.grpc.pb.h" #include "types.pb.h" +#include "ipc.h" using grpc::Server; using grpc::ServerBuilder; @@ -29,34 +28,32 @@ public: static Status upload_data_to(slice data, ObjRef objref, ObjStore::Stub& stub); }; -struct shared_object { - std::string name; - std::shared_ptr memory; - slice ptr; -}; - class ObjStoreService final : public ObjStore::Service { public: ObjStoreService(const std::string& objstore_address, std::shared_ptr scheduler_channel); - ~ObjStoreService(); - Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override; + // Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override; + // Status StreamObj(ServerContext* context, ServerReader* reader, AckReply* reply) override; Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override; - Status GetObj(ServerContext* context, const GetObjRequest* request, GetObjReply* reply) override; - Status StreamObj(ServerContext* context, ServerReader* reader, AckReply* reply) override; + void start_objstore_service(); private: - void allocate_memory(ObjRef objref, size_t size); // check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect ObjStore::Stub& get_objstore_stub(const std::string& objstore_address); + void process_requests(); - std::vector memory_names_; - std::unordered_map memory_; + std::string objstore_address_; + ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table + MemorySegmentPool segmentpool_; + std::vector > memory_; // object reference -> (memory address, memory finalized?) std::mutex memory_lock_; - size_t page_size = mapped_region::get_page_size(); std::unordered_map> objstores_; std::mutex objstores_lock_; std::unique_ptr scheduler_stub_; - ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table + std::vector > pull_queue_; + std::mutex pull_queue_lock_; + MessageQueue recv_queue_; + std::vector > send_queues_; // workerid -> queue + std::thread communicator_thread_; }; #endif diff --git a/src/worker.cc b/src/worker.cc index 387165fff..7c8d33e49 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -49,6 +49,9 @@ void Worker::register_worker(const std::string& worker_address, const std::strin ClientContext context; Status status = scheduler_stub_->RegisterWorker(&context, request, &reply); workerid_ = reply.workerid(); + request_obj_queue_.connect(std::string("queue:") + objstore_address + std::string(":obj"), false); + std::string queue_name = std::string("queue:") + objstore_address + std::string(":worker:") + std::to_string(workerid_) + std::string(":obj"); + receive_obj_queue_.connect(queue_name, true); return; } @@ -75,41 +78,35 @@ ObjRef Worker::push_object(const Obj* obj) { } slice Worker::get_object(ObjRef objref) { - ClientContext context; - GetObjRequest request; - request.set_objref(objref); - GetObjReply reply; - objstore_stub_->GetObj(&context, request, &reply); - segment_ = managed_shared_memory(open_only, reply.bucket().c_str()); + ObjRequest request; + request.workerid = workerid_; + request.type = ObjRequestType::GET; + request.objref = objref; + request_obj_queue_.send(&request); + ObjHandle result; + receive_obj_queue_.receive(&result); slice slice; - slice.data = static_cast(segment_.get_address_from_handle(reply.handle())); - slice.len = reply.size(); + slice.data = segmentpool_.get_address(result); + slice.len = result.size(); return slice; } -// TODO: Do this with shared memory +// TODO(pcm): More error handling void Worker::put_object(ObjRef objref, const Obj* obj) { - ObjChunk chunk; std::string data; - obj->SerializeToString(&data); - size_t totalsize = data.size(); - ClientContext context; - AckReply reply; - std::unique_ptr > writer( - objstore_stub_->StreamObj(&context, &reply)); - const char* head = data.c_str(); - for (size_t i = 0; i < data.length(); i += CHUNK_SIZE) { - chunk.set_objref(objref); - chunk.set_totalsize(totalsize); - chunk.set_data(head + i, std::min(CHUNK_SIZE, data.length() - i)); - if (!writer->Write(chunk)) { - ORCH_LOG(ORCH_FATAL, "write failed during put_object"); - // TODO(pcm): better error handling - } - } - writer->WritesDone(); - Status status = writer->Finish(); - // TODO(pcm): error handling + obj->SerializeToString(&data); // TODO(pcm): get rid of this serialization + ObjRequest request; + request.workerid = workerid_; + request.type = ObjRequestType::ALLOC; + request.objref = objref; + request.size = data.size(); + request_obj_queue_.send(&request); + ObjHandle result; + receive_obj_queue_.receive(&result); + char* target = segmentpool_.get_address(result); + std::memcpy(target, &data[0], data.size()); + request.type = ObjRequestType::DONE; + request_obj_queue_.send(&request); } void Worker::register_function(const std::string& name, size_t num_return_vals) { @@ -128,13 +125,11 @@ Call* Worker::receive_next_task() { unsigned int priority; message_queue::size_type recvd_size; Call* call; - while (true) { - receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority); - return call; - } + receive_queue_->receive(&call, sizeof(Call*), recvd_size, priority); + return call; } catch(interprocess_exception &ex){ - std::cout << ex.what() << std::endl; + ORCH_LOG(ORCH_FATAL, ex.what()); } } diff --git a/src/worker.h b/src/worker.h index 50b0d8a31..3281fb2d1 100644 --- a/src/worker.h +++ b/src/worker.h @@ -20,6 +20,7 @@ using grpc::Status; #include "orchestra.grpc.pb.h" #include "orchestra/orchestra.h" +#include "ipc.h" using grpc::Channel; using grpc::ClientContext; @@ -66,11 +67,13 @@ class Worker { std::unique_ptr scheduler_stub_; std::unique_ptr objstore_stub_; std::thread worker_server_thread_; - std::thread other_thread_; std::unique_ptr receive_queue_; managed_shared_memory segment_; WorkerId workerid_; std::string worker_address_; + MessageQueue request_obj_queue_; + MessageQueue receive_obj_queue_; + MemorySegmentPool segmentpool_; }; #endif diff --git a/test/runtest.py b/test/runtest.py index 17877435c..47790343c 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -147,11 +147,11 @@ class ObjStoreTest(unittest.TestCase): self.assertEqual(result, data) # pushing an object, shipping it to another worker, and pulling it shouldn't change it - for data in ["h", "h" * 10000, 0, 0.0]: - objref = orchpy.push(data, worker1) - response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.val, objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) - result = orchpy.pull(objref, worker2) - self.assertEqual(result, data) + # for data in ["h", "h" * 10000, 0, 0.0]: + # objref = worker.push(data, worker1) + # response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.val, objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) + # result = worker.pull(objref, worker2) + # self.assertEqual(result, data) services.cleanup() diff --git a/test/shell.py b/test/shell.py index 13c58ee18..adb608eb5 100644 --- a/test/shell.py +++ b/test/shell.py @@ -16,7 +16,7 @@ TIMEOUT_SECONDS = 5 parser = argparse.ArgumentParser(description='Parse addresses for the worker to connect to.') parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") -parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") +parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address") @orchpy.distributed([str], [str]) def print_string(string):