support multiple object stores, part one

This commit is contained in:
Robert Nishihara 2016-04-24 19:06:14 -07:00
parent a12e6fd373
commit 6b846d2bc0
8 changed files with 111 additions and 56 deletions

View file

@ -14,11 +14,11 @@ class Worker(object):
def put_object(self, objref, value):
"""Put `value` in the local object store with objref `objref`. This assumes that the value for `objref` has not yet been placed in the local object store."""
if type(value) == np.ndarray:
orchpy.lib.put_arrow(self.handle, objref, value)
else:
object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule
orchpy.lib.put_object(self.handle, objref, object_capsule, contained_objrefs)
# if type(value) == np.ndarray:
# orchpy.lib.put_arrow(self.handle, objref, value)
# else:
object_capsule, contained_objrefs = serialization.serialize(self.handle, value) # contained_objrefs is a list of the objrefs contained in object_capsule
orchpy.lib.put_object(self.handle, objref, object_capsule, contained_objrefs)
def get_object(self, objref):
"""
@ -27,11 +27,11 @@ class Worker(object):
WARNING: get_object can only be called on a canonical objref.
"""
if orchpy.lib.is_arrow(self.handle, objref):
return orchpy.lib.get_arrow(self.handle, objref)
else:
object_capsule = orchpy.lib.get_object(self.handle, objref)
return serialization.deserialize(self.handle, object_capsule)
# if orchpy.lib.is_arrow(self.handle, objref):
# return orchpy.lib.get_arrow(self.handle, objref)
# else:
object_capsule = orchpy.lib.get_object(self.handle, objref)
return serialization.deserialize(self.handle, object_capsule)
def alias_objrefs(self, alias_objref, target_objref):
"""Make `alias_objref` refer to the same object that `target_objref` refers to."""

View file

@ -149,10 +149,10 @@ message SchedulerInfoReply {
// Object stores
service ObjStore {
// Request to deliver an object to another object store (called by the scheduler)
rpc DeliverObj(DeliverObjRequest) returns (AckReply);
// Tell the object store to begin pulling an object from another object store (called by the scheduler)
rpc StartDelivery(StartDeliveryRequest) returns (AckReply);
// Accept incoming data from another object store, as a stream of object chunks
rpc StreamObj(stream ObjChunk) returns (AckReply);
rpc StreamObjTo(StreamObjToRequest) returns (stream ObjChunk);
// Notify the object store about objref aliasing. This is called by the scheduler
rpc NotifyAlias(NotifyAliasRequest) returns (AckReply);
// Tell the object store to deallocate an object held by the object store. This is called by the scheduler.
@ -161,8 +161,8 @@ service ObjStore {
rpc ObjStoreInfo(ObjStoreInfoRequest) returns (ObjStoreInfoReply);
}
message DeliverObjRequest {
string objstore_address = 1; // Object store to deliver the object to
message StartDeliveryRequest {
string objstore_address = 1; // Object store to pull the object from
uint64 objref = 2; // Reference of object that gets delivered
}
@ -174,9 +174,13 @@ message RegisterObjReply {
uint64 handle = 1; // Handle to memory segment where object is stored
}
message ObjChunk {
message StreamObjToRequest {
uint64 objref = 1; // Object reference of the object being streamed
uint64 totalsize = 2; // Total size of the object
}
message ObjChunk {
uint64 total_size = 1; // Total size of the object
uint64 metadata_offset = 2; // Offset of the arrow metadata
bytes data = 3; // Data for this chunk of the object
}
@ -193,12 +197,6 @@ message GetObjRequest {
uint64 objref = 1; // Object reference of the object being requested by the worker
}
message GetObjReply {
string bucket = 1; // Name of the shared memory segment where the object is stored
uint64 handle = 2; // Shared memory pointer to the object
uint64 size = 3; // Total size of the object in bytes
}
// These messages are for getting information about the object store state
message ObjStoreInfoRequest {

View file

@ -31,9 +31,9 @@ MemorySegmentPool::MemorySegmentPool(ObjStoreId objstoreid, bool create) : objst
// creates a memory segment if it is not already there; if the pool is in create mode,
// space is allocated, if it is in open mode, the shared memory is mapped into the process
void MemorySegmentPool::open_segment(SegmentId segmentid, size_t size) {
ORCH_LOG(ORCH_DEBUG, "opening segmentid " << segmentid);
ORCH_LOG(ORCH_DEBUG, "Opening segmentid " << segmentid << " on object store " << objstoreid_ << " with create_mode_ = " << create_mode_);
if (segmentid != segments_.size() && create_mode_) {
ORCH_LOG(ORCH_FATAL, "Attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size());
ORCH_LOG(ORCH_FATAL, "Object store " << objstoreid_ << " is attempting to open segmentid " << segmentid << " on the object store, but segments_.size() = " << segments_.size());
}
if (segmentid >= segments_.size()) { // resize and initialize segments_
int current_size = segments_.size();
@ -90,7 +90,12 @@ void MemorySegmentPool::deallocate(ObjHandle pointer) {
// returns address of the object refered to by the handle, needs to be called on
// the process that will use the address
uint8_t* MemorySegmentPool::get_address(ObjHandle pointer) {
open_segment(pointer.segmentid());
if (create_mode_ && segments_[pointer.segmentid()].second != SegmentStatusType::OPENED) {
ORCH_LOG(ORCH_FATAL, "Object store " << objstoreid_ << " is attempting to call get_address on segmentid " << pointer.segmentid() << ", which has not been opened yet.");
}
if (!create_mode_) {
open_segment(pointer.segmentid());
}
managed_shared_memory* segment = segments_[pointer.segmentid()].first.get();
return static_cast<uint8_t*>(segment->get_address_from_handle(pointer.ipcpointer()));
}

View file

@ -17,40 +17,50 @@ using grpc::ServerBuilder;
using grpc::ServerReader;
using grpc::ServerContext;
using grpc::ClientContext;
using grpc::ClientWriter;
using grpc::ServerWriter;
using grpc::ClientReader;
using grpc::Status;
using grpc::Channel;
class ObjStoreClient {
public:
static const size_t CHUNK_SIZE;
static Status upload_data_to(slice data, ObjRef objref, ObjStore::Stub& stub);
};
enum MemoryStatusType {READY = 0, NOT_READY = 1, DEALLOCATED = 2, NOT_PRESENT = 3};
// READY: This is used to indicate that the object has been copied from a
// worker and is ready to be used.
// NOT_READY: This is used to indicate that memory has been allocated for the
// object, but the object hasn't been copied from a worker yet.
// DEALLOCATED: This is used to indicate that the object has been deallocated.
// NOT_PRESENT: This is used to indicate that space has not been allocated for
// this object in this object store.
// PRE_ALLOCED: This is used to indicate that the memory has not yet been
// alloced, but it will be alloced soon. This is set when we call
// StartDelivery.
enum MemoryStatusType {READY = 0, NOT_READY = 1, DEALLOCATED = 2, NOT_PRESENT = 3, PRE_ALLOCED = 4};
class ObjStoreService final : public ObjStore::Service {
public:
ObjStoreService(const std::string& objstore_address, std::shared_ptr<Channel> scheduler_channel);
// Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override;
// Status StreamObj(ServerContext* context, ServerReader<ObjChunk>* reader, AckReply* reply) override;
Status StartDelivery(ServerContext* context, const StartDeliveryRequest* request, AckReply* reply) override;
Status StreamObjTo(ServerContext* context, const StreamObjToRequest* request, ServerWriter<ObjChunk>* writer) override;
Status NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) override;
Status DeallocateObject(ServerContext* context, const DeallocateObjectRequest* request, AckReply* reply) override;
Status ObjStoreInfo(ServerContext* context, const ObjStoreInfoRequest* request, ObjStoreInfoReply* reply) override;
void start_objstore_service();
private:
void pull_data_from(ObjRef objref, ObjStore::Stub& stub);
// check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect
ObjStore::Stub& get_objstore_stub(const std::string& objstore_address);
void process_worker_request(const ObjRequest request);
void process_objstore_request(const ObjRequest request);
void process_requests();
void process_pulls_for_objref(ObjRef objref);
ObjHandle alloc(ObjRef objref, size_t size);
void object_ready(ObjRef objref, size_t metadata_offset);
static const size_t CHUNK_SIZE;
std::string objstore_address_;
ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table
std::shared_ptr<MemorySegmentPool> segmentpool_;
std::mutex segmentpool_lock_;
std::vector<std::pair<ObjHandle, MemoryStatusType> > memory_; // object reference -> (memory address, memory status)
std::mutex memory_lock_;
std::unordered_map<std::string, std::unique_ptr<ObjStore::Stub>> objstores_;
@ -58,9 +68,13 @@ private:
std::unique_ptr<Scheduler::Stub> scheduler_stub_;
std::vector<std::pair<WorkerId, ObjRef> > pull_queue_;
std::mutex pull_queue_lock_;
MessageQueue<ObjRequest> recv_queue_;
std::vector<MessageQueue<ObjHandle> > send_queues_; // workerid -> queue
MessageQueue<ObjRequest> recv_queue_; // This queue is used by workers to send tasks to the object store.
std::vector<MessageQueue<ObjHandle> > send_queues_; // This maps workerid -> queue. The object store uses these queues to send replies to the relevant workers.
std::thread communicator_thread_;
std::vector<std::shared_ptr<std::thread> > delivery_threads_; // TODO(rkn): document
// TODO(rkn): possibly add lock, and properly remove these threads from the delivery_threads_ when the deliveries are done
};
#endif

View file

@ -191,8 +191,14 @@ Status SchedulerService::SchedulerInfo(ServerContext* context, const SchedulerIn
return Status::OK;
}
// TODO(rkn): This could execute multiple times with the same arguments before
// the delivery finishes, but we only want it to happen once. Currently, the
// redundancy is handled by the object store, which will only execute the
// delivery once. However, we may want to handle it in the scheduler in the
// future.
//
// deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true
void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) {
// deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true
if (from == to) {
ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself.");
}
@ -201,12 +207,12 @@ void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId
}
ClientContext context;
AckReply reply;
DeliverObjRequest request;
StartDeliveryRequest request;
ObjRef canonical_objref = get_canonical_objref(objref);
request.set_objref(canonical_objref);
std::lock_guard<std::mutex> lock(objstores_lock_);
request.set_objstore_address(objstores_[to].address);
objstores_[from].objstore_stub->DeliverObj(&context, request, &reply);
request.set_objstore_address(objstores_[from].address);
objstores_[to].objstore_stub->StartDelivery(&context, request, &reply);
}
void SchedulerService::schedule() {
@ -403,14 +409,16 @@ void SchedulerService::get_info(const SchedulerInfoRequest& request, SchedulerIn
}
// pick_objstore assumes that objtable_lock_ has been acquired
// pick_objstore must be called with a canonical_objref
ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) {
// pick_objstore must be called with a canonical_objref
std::mt19937 rng;
if (!is_canonical(canonical_objref)) {
ORCH_LOG(ORCH_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")");
}
std::uniform_int_distribution<int> uni(0, objtable_[canonical_objref].size() - 1);
return uni(rng);
ObjStoreId objstoreid = objtable_[canonical_objref][uni(rng)];
return objstoreid;
}
bool SchedulerService::is_canonical(ObjRef objref) {
@ -433,7 +441,7 @@ void SchedulerService::perform_pulls() {
continue;
}
ObjRef canonical_objref = get_canonical_objref(objref);
ORCH_LOG(ORCH_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref);
ORCH_LOG(ORCH_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref << " to objstore " << get_store(workerid));
objtable_lock_.lock();
int num_stores = objtable_[canonical_objref].size();
@ -570,16 +578,17 @@ void SchedulerService::deallocate_object(ObjRef canonical_objref) {
// so the lock must before outside of these methods (it is acquired in
// DecrementRefCount).
ORCH_LOG(ORCH_REFCOUNT, "Deallocating canonical_objref " << canonical_objref << ".");
ClientContext context;
AckReply reply;
DeallocateObjectRequest request;
request.set_canonical_objref(canonical_objref);
{
std::lock_guard<std::mutex> objtable_lock(objtable_lock_);
auto &objstores = objtable_[canonical_objref];
std::lock_guard<std::mutex> objstores_lock(objstores_lock_); // TODO(rkn): Should this be inside the for loop instead?
for (int i = 0; i < objstores.size(); ++i) {
ClientContext context;
AckReply reply;
DeallocateObjectRequest request;
request.set_canonical_objref(canonical_objref);
ObjStoreId objstoreid = objstores[i];
ORCH_LOG(ORCH_REFCOUNT, "Attempting to deallocate canonical_objref " << canonical_objref << " from objstore " << objstoreid);
objstores_[objstoreid].objstore_stub->DeallocateObject(&context, request, &reply);
}
}

View file

@ -156,7 +156,7 @@ std::shared_ptr<arrow::Array> read_flat_array(BufferMemorySource* source, int64_
std::shared_ptr<ipc::RowBatchReader> reader;
Status s = ipc::RowBatchReader::Open(source, metadata_offset, &reader);
if (!s.ok()) {
ORCH_LOG(ORCH_FATAL, s.ToString());
ORCH_LOG(ORCH_FATAL, "Error in read_flat_array: " << s.ToString());
}
auto field = std::make_shared<arrow::Field>("data", npy_traits<NpyType>::primitive_type);
std::shared_ptr<arrow::Schema> schema(new arrow::Schema({field}));

View file

@ -81,7 +81,7 @@ class ArraysDistTest(unittest.TestCase):
def testMethods(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_cluster(return_drivers=False, num_workers_per_objstore=8, worker_path=test_path)
services.start_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=8, worker_path=test_path)
x = dist.zeros([9, 25, 51], "float")
y = dist.assemble(x)

View file

@ -78,11 +78,40 @@ class ObjStoreTest(unittest.TestCase):
self.assertEqual(result, data)
# pushing an object, shipping it to another worker, and pulling it shouldn't change it
# for data in ["h", "h" * 10000, 0, 0.0]:
# objref = worker.push(data, worker1)
# response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.val, objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS)
# result = worker.pull(objref, worker2)
# self.assertEqual(result, data)
for data in ["h", "h" * 10000, 0, 0.0, [1, 2, 3, "a", (1, 2)], ("a", ("b", 3))]:
objref = worker.push(data, w1)
result = worker.pull(objref, w2)
self.assertEqual(result, data)
# pushing an array, shipping it to another worker, and pulling it shouldn't change it
for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25])]:
objref = worker.push(data, w1)
result = worker.pull(objref, w2)
self.assertTrue(np.alltrue(result == data))
"""
# pulling multiple times shouldn't matter
for data in [np.zeros([10, 20]), np.random.normal(size=[45, 25]), np.zeros([10, 20], dtype=np.dtype("float64")), np.zeros([10, 20], dtype=np.dtype("float32")), np.zeros([10, 20], dtype=np.dtype("int64")), np.zeros([10, 20], dtype=np.dtype("int32"))]:
objref = worker.push(data, w1)
result = worker.pull(objref, w2)
result = worker.pull(objref, w2)
result = worker.pull(objref, w2)
self.assertTrue(np.alltrue(result == data))
"""
# shipping a numpy array inside something else should be fine
data = ("a", np.random.normal(size=[10, 10]))
objref = worker.push(data, w1)
result = worker.pull(objref, w2)
self.assertTrue(data[0] == result[0])
self.assertTrue(np.alltrue(data[1] == result[1]))
# shipping a numpy array inside something else should be fine
data = ["a", np.random.normal(size=[10, 10])]
objref = worker.push(data, w1)
result = worker.pull(objref, w2)
self.assertTrue(data[0] == result[0])
self.assertTrue(np.alltrue(data[1] == result[1]))
services.cleanup()