From a28920bb24c82d76d2318303e0112671f0cf8d17 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 8 Apr 2016 12:58:08 -0700 Subject: [PATCH] Alias (#38) * implement objref aliasing * updates --- lib/orchpy/orchpy/worker.py | 36 ++++-- protos/orchestra.proto | 14 +++ src/ipc.h | 8 +- src/objstore.cc | 161 ++++++++++++++++++-------- src/objstore.h | 4 + src/orchpylib.cc | 13 +++ src/scheduler.cc | 222 ++++++++++++++++++++++++++++++++---- src/scheduler.h | 28 ++++- src/worker.cc | 14 ++- src/worker.h | 2 + test/runtest.py | 17 ++- test/shell.py | 13 +++ test/testrecv.py | 5 +- 13 files changed, 441 insertions(+), 96 deletions(-) diff --git a/lib/orchpy/orchpy/worker.py b/lib/orchpy/orchpy/worker.py index e71625796..a8daa8eac 100644 --- a/lib/orchpy/orchpy/worker.py +++ b/lib/orchpy/orchpy/worker.py @@ -22,13 +22,22 @@ class Worker(object): orchpy.lib.put_object(self.handle, objref, object_capsule) def get_object(self, objref): - """Return the value from the local object store for objref `objref`. This will block until the value for `objref` has been written to the local object store.""" + """ + Return the value from the local object store for objref `objref`. This will + block until the value for `objref` has been written to the local object store. + + WARNING: get_object can only be called on a canonical objref. + """ if orchpy.lib.is_arrow(self.handle, objref): return orchpy.lib.get_arrow(self.handle, objref) else: object_capsule = orchpy.lib.get_object(self.handle, objref) return serialization.deserialize(object_capsule) + def alias_objrefs(self, alias_objref, target_objref): + """Make `alias_objref` refer to the same object that `target_objref` refers to.""" + orchpy.lib.alias_objrefs(self.handle, alias_objref, target_objref) + def register_function(self, function): """Notify the scheduler that this worker can execute the function with name `func_name`. Store the function `function` locally.""" orchpy.lib.register_function(self.handle, function.func_name, len(function.return_types)) @@ -105,14 +114,15 @@ def distributed(arg_types, return_types, worker=global_worker): # helper method, this should not be called by the user def check_return_values(function, result): if len(function.return_types) == 1: - if not isinstance(result, function.return_types[0]): - raise Exception("The @distributed decorator for function {} expects one return value with type {}, but {} returned a {}.".format(function.__name__, function.return_types[0], function.__name__, type(result))) + result = (result,) + # if not isinstance(result, function.return_types[0]): + # raise Exception("The @distributed decorator for function {} expects one return value with type {}, but {} returned a {}.".format(function.__name__, function.return_types[0], function.__name__, type(result))) else: if len(result) != len(function.return_types): raise Exception("The @distributed decorator for function {} has {} return values with types {}, but {} returned {} values.".format(function.__name__, len(function.return_types), function.return_types, function.__name__, len(result))) for i in range(len(result)): - if not isinstance(result[i], function.return_types[i]): - raise Exception("The {}th return value for function {} has type {}, but the @distributed decorator expected a return value of type {}.".format(i, function.__name__, type(result[i]), function.return_types[i])) + if (not isinstance(result[i], function.return_types[i])) and (not isinstance(result[i], orchpy.lib.ObjRef)): + raise Exception("The {}th return value for function {} has type {}, but the @distributed decorator expected a return value of type {} or an ObjRef.".format(i, function.__name__, type(result[i]), function.return_types[i])) # helper method, this should not be called by the user def check_arguments(function, args): @@ -132,7 +142,7 @@ def check_arguments(function, args): else: assert False, "This code should be unreachable." - if type(arg) == orchpy.lib.ObjRef: + if isinstance(arg, orchpy.lib.ObjRef): # TODO(rkn): When we have type information in the ObjRef, do type checking here. pass else: @@ -161,7 +171,7 @@ def get_arguments_for_execution(function, args, worker=global_worker): else: assert False, "This code should be unreachable." - if type(arg) == orchpy.lib.ObjRef: + if isinstance(arg, orchpy.lib.ObjRef): # get the object from the local object store print "Getting argument {} for function {}.".format(i, function.__name__) argument = worker.get_object(arg) @@ -178,7 +188,13 @@ def get_arguments_for_execution(function, args, worker=global_worker): # helper method, this should not be called by the user def store_outputs_in_objstore(objrefs, outputs, worker=global_worker): if len(objrefs) == 1: - worker.put_object(objrefs[0], outputs) - else: - for i in range(len(objrefs)): + outputs = (outputs,) + + for i in range(len(objrefs)): + if isinstance(outputs[i], orchpy.lib.ObjRef): + # An ObjRef is being returned, so we must alias objrefs[i] so that it refers to the same object that outputs[i] refers to + print "Aliasing objrefs {} and {}".format(objrefs[i].val, outputs[i].val) + worker.alias_objrefs(objrefs[i], outputs[i]) + pass + else: worker.put_object(objrefs[i], outputs[i]) diff --git a/protos/orchestra.proto b/protos/orchestra.proto index 6eb7d90c0..b69ec0fd6 100644 --- a/protos/orchestra.proto +++ b/protos/orchestra.proto @@ -33,6 +33,8 @@ service Scheduler { rpc PushObj(PushObjRequest) returns (PushObjReply); // Request delivery of an object from an object store that holds the object to the local object store rpc RequestObj(RequestObjRequest) returns (AckReply); + // Used by the worker to tell the scheduler that two objrefs should refer to the same object + rpc AliasObjRefs(AliasObjRefsRequest) returns (AckReply); // Used by an object store to tell the scheduler that an object is ready (i.e. has been finalized and can be shared) rpc ObjReady(ObjReadyRequest) returns (AckReply); // Used by the worker to report back and ask for more work @@ -88,6 +90,11 @@ message PushObjReply { uint64 objref = 1; // Object reference assigned by the scheduler to the object } +message AliasObjRefsRequest { + uint64 alias_objref = 1; // ObjRef which will be aliased + uint64 target_objref = 2; // The target ObjRef +} + message ObjReadyRequest { uint64 objref = 1; // Object reference of the object that has been finalized uint64 objstoreid = 2; // ID of the object store the object lives on @@ -124,6 +131,8 @@ service ObjStore { rpc DeliverObj(DeliverObjRequest) returns (AckReply); // Accept incoming data from another object store, as a stream of object chunks rpc StreamObj(stream ObjChunk) returns (AckReply); + // Notify the object store about objref aliasing. This is called by the scheduler + rpc NotifyAlias(NotifyAliasRequest) returns (AckReply); // Get debug info from the object store rpc ObjStoreDebugInfo(ObjStoreDebugInfoRequest) returns (ObjStoreDebugInfoReply); } @@ -147,6 +156,11 @@ message ObjChunk { bytes data = 3; // Data for this chunk of the object } +message NotifyAliasRequest { + uint64 alias_objref = 1; // The objref being aliased + uint64 canonical_objref = 2; // The canonical objref that points to the actual object +} + message GetObjRequest { uint64 objref = 1; // Object reference of the object being requested by the worker } diff --git a/src/ipc.h b/src/ipc.h index 7066793f3..69cccec7c 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -84,17 +84,19 @@ private: // worker requests an allocation from the object store // GET: workerid, objref -> objhandle: // worker requests an object from the object store -// DONE: workerid, objref -> (): +// WORKER_DONE: workerid, objref -> (): // worker tells the object store that an object has been finalized +// ALIAS_DONE: objref -> (): +// objstore tells itself that it has finalized something (perhaps an alias) -enum ObjRequestType {ALLOC = 0, GET = 1, DONE = 2}; +enum ObjRequestType {ALLOC = 0, GET = 1, WORKER_DONE = 2, ALIAS_DONE}; struct ObjRequest { WorkerId workerid; // worker that sends the request ObjRequestType type; // do we want to allocate a new object or get a handle? ObjRef objref; // object reference of the object to be returned/allocated int64_t size; // if allocate, that's the size of the object - int64_t metadata_offset; // if sending 'DONE', that's the location of the metadata relative to the beginning of the object + int64_t metadata_offset; // if sending 'WORKER_DONE', that's the location of the metadata relative to the beginning of the object }; typedef size_t SegmentId; // index into a memory segment table diff --git a/src/objstore.cc b/src/objstore.cc index 2ddaa9a10..50dd2ecba 100644 --- a/src/objstore.cc +++ b/src/objstore.cc @@ -112,57 +112,80 @@ Status ObjStoreService::StreamObj(ServerContext* context, ServerReader } */ -void ObjStoreService::process_requests() { - ObjRequest request; - while (true) { - recv_queue_.receive(&request); - if (request.workerid >= send_queues_.size()) { - send_queues_.resize(request.workerid + 1); - } - if (!send_queues_[request.workerid].connected()) { - std::string queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(request.workerid) + std::string(":obj"); - send_queues_[request.workerid].connect(queue_name, false); - } - if (request.objref >= memory_.size()) { - memory_.resize(request.objref + 1); - memory_[request.objref].second = false; - } - switch (request.type) { - case ObjRequestType::ALLOC: { - ObjHandle reply = segmentpool_.allocate(request.size); - send_queues_[request.workerid].send(&reply); - if (request.objref >= memory_.size()) { - memory_.resize(request.objref + 1); - } - memory_[request.objref].first = reply; - memory_[request.objref].second = false; +Status ObjStoreService::NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) { + // NotifyAlias assumes that the objstore already holds canonical_objref + ObjRef alias_objref = request->alias_objref(); + ObjRef canonical_objref = request->canonical_objref(); + std::lock_guard memory_lock(memory_lock_); + if (canonical_objref >= memory_.size()) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not in the objstore.") + } + if (!memory_[canonical_objref].second) { + ORCH_LOG(ORCH_FATAL, "Attempting to alias objref " << alias_objref << " with objref " << canonical_objref << ", but objref " << canonical_objref << " is not ready yet in the objstore.") + } + if (alias_objref >= memory_.size()) { + memory_.resize(alias_objref + 1); + } + memory_[alias_objref].first = memory_[canonical_objref].first; + memory_[alias_objref].second = true; + + ObjRequest done_request; + done_request.type = ObjRequestType::ALIAS_DONE; + done_request.objref = alias_objref; + recv_queue_.send(&done_request); + return Status::OK; +} + +void ObjStoreService::process_objstore_request(const ObjRequest request) { + switch (request.type) { + case ObjRequestType::ALIAS_DONE: { + process_pulls_for_objref(request.objref); + } + break; + default: { + ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + } + } +} + +void ObjStoreService::process_worker_request(const ObjRequest request) { + if (request.workerid >= send_queues_.size()) { + send_queues_.resize(request.workerid + 1); + } + if (!send_queues_[request.workerid].connected()) { + std::string queue_name = std::string("queue:") + objstore_address_ + std::string(":worker:") + std::to_string(request.workerid) + std::string(":obj"); + send_queues_[request.workerid].connect(queue_name, false); + } + if (request.objref >= memory_.size()) { + memory_.resize(request.objref + 1); + memory_[request.objref].second = false; + } + switch (request.type) { + case ObjRequestType::ALLOC: { + ObjHandle reply = segmentpool_.allocate(request.size); + send_queues_[request.workerid].send(&reply); + if (request.objref >= memory_.size()) { + memory_.resize(request.objref + 1); } - break; - case ObjRequestType::GET: { - std::pair& item = memory_[request.objref]; - if (item.second) { - send_queues_[request.workerid].send(&item.first); - } else { - std::lock_guard lock(pull_queue_lock_); - pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); - } + memory_[request.objref].first = reply; + memory_[request.objref].second = false; + } + break; + case ObjRequestType::GET: { + std::pair& item = memory_[request.objref]; + if (item.second) { + send_queues_[request.workerid].send(&item.first); + } else { + std::lock_guard lock(pull_queue_lock_); + pull_queue_.push_back(std::make_pair(request.workerid, request.objref)); } - break; - case ObjRequestType::DONE: { + } + break; + case ObjRequestType::WORKER_DONE: { std::pair& item = memory_[request.objref]; item.first.set_metadata_offset(request.metadata_offset); item.second = true; - std::lock_guard pull_queue_lock(pull_queue_lock_); - for (size_t i = 0; i < pull_queue_.size(); ++i) { - if (pull_queue_[i].second == request.objref) { - ObjHandle& elem = memory_[request.objref].first; - send_queues_[pull_queue_[i].first].send(&item.first); - // Remove the pull task from the queue - std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); - pull_queue_.pop_back(); - i -= 1; - } - } + process_pulls_for_objref(request.objref); // Tell the scheduler that the object arrived // TODO(pcm): put this in a separate thread so we don't have to pay the latency here ClientContext objready_context; @@ -173,6 +196,52 @@ void ObjStoreService::process_requests() { scheduler_stub_->ObjReady(&objready_context, objready_request, &objready_reply); } break; + default: { + ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + } + } +} + +void ObjStoreService::process_requests() { + // TODO(rkn): Should memory_lock_ be used in this method? + ObjRequest request; + while (true) { + recv_queue_.receive(&request); + switch (request.type) { + case ObjRequestType::ALLOC: { + process_worker_request(request); + } + break; + case ObjRequestType::GET: { + process_worker_request(request); + } + break; + case ObjRequestType::WORKER_DONE: { + process_worker_request(request); + } + break; + case ObjRequestType::ALIAS_DONE: { + process_objstore_request(request); + } + break; + default: { + ORCH_LOG(ORCH_FATAL, "Attempting to process request of type " << request.type << ". This code should be unreachable."); + } + } + } +} + +void ObjStoreService::process_pulls_for_objref(ObjRef objref) { + std::pair& item = memory_[objref]; + std::lock_guard pull_queue_lock(pull_queue_lock_); + for (size_t i = 0; i < pull_queue_.size(); ++i) { + if (pull_queue_[i].second == objref) { + ObjHandle& elem = memory_[objref].first; + send_queues_[pull_queue_[i].first].send(&item.first); + // Remove the pull task from the queue + std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); + pull_queue_.pop_back(); + i -= 1; } } } diff --git a/src/objstore.h b/src/objstore.h index 27ba69df3..0985482a7 100644 --- a/src/objstore.h +++ b/src/objstore.h @@ -34,12 +34,16 @@ public: // Status DeliverObj(ServerContext* context, const DeliverObjRequest* request, AckReply* reply) override; // Status StreamObj(ServerContext* context, ServerReader* reader, AckReply* reply) override; + Status NotifyAlias(ServerContext* context, const NotifyAliasRequest* request, AckReply* reply) override; Status ObjStoreDebugInfo(ServerContext* context, const ObjStoreDebugInfoRequest* request, ObjStoreDebugInfoReply* reply) override; void start_objstore_service(); private: // check if we already connected to the other objstore, if yes, return reference to connection, otherwise connect ObjStore::Stub& get_objstore_stub(const std::string& objstore_address); + void process_worker_request(const ObjRequest request); + void process_objstore_request(const ObjRequest request); void process_requests(); + void process_pulls_for_objref(ObjRef objref); std::string objstore_address_; ObjStoreId objstoreid_; // id of this objectstore in the scheduler object store table diff --git a/src/orchpylib.cc b/src/orchpylib.cc index 1fc194c2a..5250fdd89 100644 --- a/src/orchpylib.cc +++ b/src/orchpylib.cc @@ -586,6 +586,7 @@ PyObject* put_object(PyObject* self, PyObject* args) { } PyObject* get_object(PyObject* self, PyObject* args) { + // get_object assumes that objref is a canonical objref Worker* worker; ObjRef objref; if (!PyArg_ParseTuple(args, "O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &objref)) { @@ -607,6 +608,17 @@ PyObject* request_object(PyObject* self, PyObject* args) { Py_RETURN_NONE; } +PyObject* alias_objrefs(PyObject* self, PyObject* args) { + Worker* worker; + ObjRef alias_objref; + ObjRef target_objref; + if (!PyArg_ParseTuple(args, "O&O&O&", &PyObjectToWorker, &worker, &PyObjectToObjRef, &alias_objref, &PyObjectToObjRef, &target_objref)) { + return NULL; + } + worker->alias_objrefs(alias_objref, target_objref); + Py_RETURN_NONE; +} + PyObject* start_worker_service(PyObject* self, PyObject* args) { Worker* worker; if (!PyArg_ParseTuple(args, "O&", &PyObjectToWorker, &worker)) { @@ -630,6 +642,7 @@ static PyMethodDef OrchPyLibMethods[] = { { "get_object", get_object, METH_VARARGS, "get protocol buffer object from the local object store" }, { "get_objref", get_objref, METH_VARARGS, "register a new object reference with the scheduler" }, { "request_object" , request_object, METH_VARARGS, "request an object to be delivered to the local object store" }, + { "alias_objrefs", alias_objrefs, METH_VARARGS, "make two objrefs refer to the same object" }, { "wait_for_next_task", wait_for_next_task, METH_VARARGS, "get next task from scheduler (blocking)" }, { "remote_call", remote_call, METH_VARARGS, "call a remote function" }, { "notify_task_completed", notify_task_completed, METH_VARARGS, "notify the scheduler that a task has been completed" }, diff --git a/src/scheduler.cc b/src/scheduler.cc index aca637400..27991c912 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -56,6 +56,33 @@ Status SchedulerService::RequestObj(ServerContext* context, const RequestObjRequ return Status::OK; } +Status SchedulerService::AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) { + ObjRef alias_objref = request->alias_objref(); + ObjRef target_objref = request->target_objref(); + + objtable_lock_.lock(); + size_t size = objtable_.size(); + objtable_lock_.unlock(); + + if (alias_objref >= size) { + ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << alias_objref << " exists"); + } + if (target_objref >= size) { + ORCH_LOG(ORCH_FATAL, "internal error: no object with objref " << target_objref << " exists"); + } + + { + std::lock_guard objstore_lock(target_objrefs_lock_); + if (target_objrefs_[alias_objref] != UNITIALIZED_ALIAS) { + ORCH_LOG(ORCH_FATAL, "internal error: attempting to alias objref " << alias_objref << " with objref " << target_objref << ", but objref " << alias_objref << " has already been aliased with objref " << target_objrefs_[alias_objref]); + } + target_objrefs_[alias_objref] = target_objref; + } + + schedule(); + return Status::OK; +} + Status SchedulerService::RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) { std::lock_guard objstore_lock(objstores_lock_); ObjStoreId objstoreid = objstores_.size(); @@ -84,8 +111,10 @@ Status SchedulerService::RegisterFunction(ServerContext* context, const Register } Status SchedulerService::ObjReady(ServerContext* context, const ObjReadyRequest* request, AckReply* reply) { - ORCH_LOG(ORCH_VERBOSE, "object " << request->objref() << " ready on store " << request->objstoreid()); - add_location(request->objref(), request->objstoreid()); + ObjRef objref = request->objref(); + ORCH_LOG(ORCH_VERBOSE, "object " << objref << " ready on store " << request->objstoreid()); + add_canonical_objref(objref); + add_location(objref, request->objstoreid()); schedule(); return Status::OK; } @@ -106,13 +135,18 @@ Status SchedulerService::SchedulerDebugInfo(ServerContext* context, const Schedu } void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId to) { + // deliver_object assumes that the aliasing for objref has already been completed. That is, has_canonical_objref(objref) == true if (from == to) { ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << " from objstore " << from << " to itself."); } + if (!has_canonical_objref(objref)) { + ORCH_LOG(ORCH_FATAL, "attempting to deliver objref " << objref << ", but this objref does not yet have a canonical objref."); + } ClientContext context; AckReply reply; DeliverObjRequest request; - request.set_objref(objref); + ObjRef canonical_objref = get_canonical_objref(objref); + request.set_objref(canonical_objref); std::lock_guard lock(objstores_lock_); request.set_objstore_address(objstores_[to].address); objstores_[from].objstore_stub->DeliverObj(&context, request, &reply); @@ -120,20 +154,36 @@ void SchedulerService::deliver_object(ObjRef objref, ObjStoreId from, ObjStoreId void SchedulerService::schedule() { // TODO: don't recheck if nothing changed + // This method consists of three kinds of tasks, which are queued in pull_queue_, + // task_queue_, and alias_notification_queue_. They should probably be separated + // into three methods, which are all called from this method + + // See what we can do in pull_queue_ { std::lock_guard objtable_lock(objtable_lock_); std::lock_guard pull_queue_lock(pull_queue_lock_); // Complete all pull tasks that can be completed. for (int i = 0; i < pull_queue_.size(); ++i) { const std::pair& pull = pull_queue_[i]; - WorkerId workerid = pull.first; ObjRef objref = pull.second; - if (objtable_[objref].size() > 0) { - if (!std::binary_search(objtable_[objref].begin(), objtable_[objref].end(), get_store(workerid))) { + WorkerId workerid = pull.first; + if (!has_canonical_objref(objref)) { + ORCH_LOG(ORCH_DEBUG, "objref " << objref << " does not have a canonical_objref, so continuing"); + continue; + } + ObjRef canonical_objref = get_canonical_objref(objref); + ORCH_LOG(ORCH_DEBUG, "attempting to pull objref " << pull.second << " with canonical objref " << canonical_objref); + if (objtable_[canonical_objref].size() > 0) { + if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), get_store(workerid))) { // The worker's local object store does not already contain objref, so ship // it there from an object store that does have it. - ObjStoreId objstoreid = pick_objstore(objref); - deliver_object(objref, objstoreid, get_store(workerid)); + ObjStoreId objstoreid = pick_objstore(canonical_objref); + deliver_object(canonical_objref, objstoreid, get_store(workerid)); + } + { + // Notify the relevant objstore about potential aliasing when it's ready + std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); + alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref))); } // Remove the pull task from the queue std::swap(pull_queue_[i], pull_queue_[pull_queue_.size() - 1]); @@ -142,6 +192,7 @@ void SchedulerService::schedule() { } } } + // See what we can do in task_queue_ { std::lock_guard fntable_lock(fntable_lock_); std::lock_guard avail_workers_lock(avail_workers_lock_); @@ -166,9 +217,26 @@ void SchedulerService::schedule() { } } } + // See what we can do in alias_notification_queue_ + { + std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); + for (int i = 0; i < alias_notification_queue_.size(); ++i) { + const std::pair > alias_notification = alias_notification_queue_[i]; + ObjStoreId objstoreid = alias_notification.first; + ObjRef alias_objref = alias_notification.second.first; + ObjRef canonical_objref = alias_notification.second.second; + if (attempt_notify_alias(objstoreid, alias_objref, canonical_objref)) { // this locks both the objstore_ and objtable_ + // the attempt to notify the objstore of the objref aliasing succeeded, so remove the notification task from the queue + std::swap(alias_notification_queue_[i], alias_notification_queue_[alias_notification_queue_.size() - 1]); + alias_notification_queue_.pop_back(); + i -= 1; + } + } + } } void SchedulerService::submit_task(std::unique_ptr call, WorkerId workerid) { + // submit task assumes that the canonical objrefs for its arguments are all ready, that is has_canonical_objref() is true for all of the call's arguments ClientContext context; InvokeCallRequest request; InvokeCallReply reply; @@ -176,12 +244,20 @@ void SchedulerService::submit_task(std::unique_ptr call, WorkerId workerid for (size_t i = 0; i < call->arg_size(); ++i) { if (!call->arg(i).has_obj()) { ObjRef objref = call->arg(i).ref(); - ORCH_LOG(ORCH_INFO, "call contains object ref " << objref); + ObjRef canonical_objref = get_canonical_objref(objref); + { + // Notify the relevant objstore about potential aliasing when it's ready + std::lock_guard alias_notification_queue_lock(alias_notification_queue_lock_); + alias_notification_queue_.push_back(std::make_pair(get_store(workerid), std::make_pair(objref, canonical_objref))); + } + attempt_notify_alias(get_store(workerid), objref, canonical_objref); + + ORCH_LOG(ORCH_INFO, "call contains object ref " << canonical_objref); std::lock_guard objtable_lock(objtable_lock_); - auto &objstores = objtable_[call->arg(i).ref()]; + auto &objstores = objtable_[canonical_objref]; std::lock_guard workers_lock(workers_lock_); - if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { - deliver_object(objref, pick_objstore(objref), workers_[workerid].objstoreid); + if (!std::binary_search(objstores.begin(), objstores.end(), workers_[workerid].objstoreid)) { // TODO(rkn): replace this with get_store + deliver_object(canonical_objref, pick_objstore(canonical_objref), workers_[workerid].objstoreid); // TODO(rkn): replace this with get_store } } } @@ -194,7 +270,11 @@ bool SchedulerService::can_run(const Call& task) { for (int i = 0; i < task.arg_size(); ++i) { if (!task.arg(i).has_obj()) { ObjRef objref = task.arg(i).ref(); - if (objref >= objtable_.size() || objtable_[objref].size() == 0) { + if (!has_canonical_objref(objref)) { + return false; + } + ObjRef canonical_objref = get_canonical_objref(objref); + if (canonical_objref >= objtable_.size() || objtable_[canonical_objref].size() == 0) { return false; } } @@ -234,24 +314,46 @@ WorkerId SchedulerService::register_worker(const std::string& worker_address, co } ObjRef SchedulerService::register_new_object() { - std::lock_guard lock(objtable_lock_); - ObjRef result = objtable_.size(); + // If we don't simultaneously lock objtable_ and target_objrefs_, we will probably get errors. + std::lock_guard objtable_lock(objtable_lock_); + std::lock_guard target_objrefs_lock(target_objrefs_lock_); + ObjRef objtable_size = objtable_.size(); + ObjRef target_objrefs_size = target_objrefs_.size(); + if (objtable_size != target_objrefs_size) { + ORCH_LOG(ORCH_FATAL, "objtable_ and target_objrefs_ should have the same size, but objtable_.size() = " << objtable_size << " and target_objrefs_.size() = " << target_objrefs_size); + } objtable_.push_back(std::vector()); - return result; + target_objrefs_.push_back(UNITIALIZED_ALIAS); + return objtable_size; } -void SchedulerService::add_location(ObjRef objref, ObjStoreId objstoreid) { +void SchedulerService::add_location(ObjRef canonical_objref, ObjStoreId objstoreid) { + // add_location must be called with a canonical objref + if (!is_canonical(canonical_objref)) { + ORCH_LOG(ORCH_FATAL, "Attempting to call add_location with a non-canonical objref (objref " << canonical_objref << ")"); + } std::lock_guard objtable_lock(objtable_lock_); - if (objref >= objtable_.size()) { - ORCH_LOG(ORCH_FATAL, "trying to put object on object store that was not registered with the scheduler"); + if (canonical_objref >= objtable_.size()) { + ORCH_LOG(ORCH_FATAL, "trying to put an object in the object store that was not registered with the scheduler (objref " << canonical_objref << ")"); } // do a binary search - auto pos = std::lower_bound(objtable_[objref].begin(), objtable_[objref].end(), objstoreid); - if (pos == objtable_[objref].end() || objstoreid < *pos) { - objtable_[objref].insert(pos, objstoreid); + auto pos = std::lower_bound(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid); + if (pos == objtable_[canonical_objref].end() || objstoreid < *pos) { + objtable_[canonical_objref].insert(pos, objstoreid); } } +void SchedulerService::add_canonical_objref(ObjRef objref) { + std::lock_guard lock(target_objrefs_lock_); + if (objref >= target_objrefs_.size()) { + ORCH_LOG(ORCH_FATAL, "internal error: attempting to insert objref " << objref << " in target_objrefs_, but target_objrefs_.size() is " << target_objrefs_.size()); + } + if (target_objrefs_[objref] != UNITIALIZED_ALIAS) { + ORCH_LOG(ORCH_FATAL, "internal error: attempting to declare objref " << objref << " as a canonical objref, but target_objrefs_[objref] is already aliased with objref " << target_objrefs_[objref]); + } + target_objrefs_[objref] = objref; +} + ObjStoreId SchedulerService::get_store(WorkerId workerid) { std::lock_guard lock(workers_lock_); ObjStoreId result = workers_[workerid].objstoreid; @@ -288,12 +390,84 @@ void SchedulerService::debug_info(const SchedulerDebugInfoRequest& request, Sche avail_workers_lock_.unlock(); } -ObjStoreId SchedulerService::pick_objstore(ObjRef objref) { +ObjStoreId SchedulerService::pick_objstore(ObjRef canonical_objref) { + // pick_objstore must be called with a canonical_objref std::mt19937 rng; - std::uniform_int_distribution uni(0, objtable_[objref].size()-1); + if (!is_canonical(canonical_objref)) { + ORCH_LOG(ORCH_FATAL, "Attempting to call pick_objstore with a non-canonical objref, (objref " << canonical_objref << ")"); + } + std::uniform_int_distribution uni(0, objtable_[canonical_objref].size() - 1); return uni(rng); } +bool SchedulerService::is_canonical(ObjRef objref) { + std::lock_guard lock(target_objrefs_lock_); + if (target_objrefs_[objref] == UNITIALIZED_ALIAS) { + ORCH_LOG(ORCH_FATAL, "Attempting to call is_canonical on an objref for which aliasing is not complete or the object is not ready, target_objrefs_[objref] == UNITIALIZED_ALIAS for objref " << objref << "."); + } + return objref == target_objrefs_[objref]; +} + +bool SchedulerService::has_canonical_objref(ObjRef objref) { + std::lock_guard lock(target_objrefs_lock_); + ObjRef objref_temp = objref; + while (true) { + if (objref_temp >= target_objrefs_.size()) { + ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + } + if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { + return false; + } + if (target_objrefs_[objref_temp] == objref_temp) { + return true; + } + objref_temp = target_objrefs_[objref_temp]; + } +} + +ObjRef SchedulerService::get_canonical_objref(ObjRef objref) { + // get_canonical_objref assumes that has_canonical_objref(objref) is true + std::lock_guard lock(target_objrefs_lock_); + ObjRef objref_temp = objref; + while (true) { + if (objref_temp >= target_objrefs_.size()) { + ORCH_LOG(ORCH_FATAL, "Attempting to index target_objrefs_ with objref " << objref_temp << ", but target_objrefs_.size() = " << target_objrefs_.size()); + } + if (target_objrefs_[objref_temp] == UNITIALIZED_ALIAS) { + ORCH_LOG(ORCH_FATAL, "Attempting to get canonical objref for objref " << objref << ", which aliases, objref " << objref_temp << ", but target_objrefs_[objref_temp] == UNITIALIZED_ALIAS for objref_temp = " << objref_temp << "."); + } + if (target_objrefs_[objref_temp] == objref_temp) { + return objref_temp; + } + objref_temp = target_objrefs_[objref_temp]; + ORCH_LOG(ORCH_DEBUG, "Looping in get_canonical_objref."); + } +} + +bool SchedulerService::attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_objref, ObjRef canonical_objref) { + // return true if successful and false otherwise + if (alias_objref == canonical_objref) { + // no need to do anything + return true; + } + { + std::lock_guard lock(objtable_lock_); + if (!std::binary_search(objtable_[canonical_objref].begin(), objtable_[canonical_objref].end(), objstoreid)) { + // the objstore doesn't have the object for canonical_objref yet, so it's too early to notify the objstore about the alias + return false; + } + } + ClientContext context; + AckReply reply; + NotifyAliasRequest request; + request.set_alias_objref(alias_objref); + request.set_canonical_objref(canonical_objref); + objstores_lock_.lock(); + objstores_[objstoreid].objstore_stub->NotifyAlias(&context, request, &reply); + objstores_lock_.unlock(); + return true; +} + void start_scheduler_service(const char* server_address) { SchedulerService service; ServerBuilder builder; diff --git a/src/scheduler.h b/src/scheduler.h index ec677e8f4..00e7f19d6 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -23,6 +24,8 @@ using grpc::ClientContext; using grpc::Channel; +const ObjRef UNITIALIZED_ALIAS = std::numeric_limits::max(); + struct WorkerHandle { std::shared_ptr channel; std::unique_ptr worker_stub; @@ -40,6 +43,7 @@ public: Status RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) override; Status PushObj(ServerContext* context, const PushObjRequest* request, PushObjReply* reply) override; Status RequestObj(ServerContext* context, const RequestObjRequest* request, AckReply* reply) override; + Status AliasObjRefs(ServerContext* context, const AliasObjRefsRequest* request, AckReply* reply) override; Status RegisterObjStore(ServerContext* context, const RegisterObjStoreRequest* request, RegisterObjStoreReply* reply) override; Status RegisterWorker(ServerContext* context, const RegisterWorkerRequest* request, RegisterWorkerReply* reply) override; Status RegisterFunction(ServerContext* context, const RegisterFunctionRequest* request, AckReply* reply) override; @@ -61,6 +65,8 @@ public: ObjRef register_new_object(); // register the location of the object reference in the object table void add_location(ObjRef objref, ObjStoreId objstoreid); + // indicate that objref is a canonical objref + void add_canonical_objref(ObjRef objref); // get object store associated with a workerid ObjStoreId get_store(WorkerId workerid); // register a function with the scheduler @@ -70,6 +76,14 @@ public: private: // pick an objectstore that holds a given object (needs protection by objtable_lock_) ObjStoreId pick_objstore(ObjRef objref); + // checks if objref is a canonical objref + bool is_canonical(ObjRef objref); + // checks if aliasing for objref has been completed + bool has_canonical_objref(ObjRef objref); + // get the canonical objref for an objref + ObjRef get_canonical_objref(ObjRef objref); + // attempt to notify the objstore about potential objref aliasing, returns true if successful, if false then retry later + bool attempt_notify_alias(ObjStoreId objstoreid, ObjRef alias_objref, ObjRef canonical_objref); // Vector of all workers registered in the system. Their index in this vector // is the workerid. @@ -82,7 +96,16 @@ private: // vector is the objstoreid. std::vector objstores_; grpc::mutex objstores_lock_; - // Mapping from objref to list of object stores where the object is stored. + + // Mapping from an aliased objref to the objref it is aliased with. If an + // objref is a canonical objref (meaning it is not aliased), then + // target_objrefs_[objref] == objref. For each objref, target_objrefs_[objref] + // is initialized to UNITIALIZED_ALIAS and the correct value is filled later + // when it is known. + std::vector target_objrefs_; + std::mutex target_objrefs_lock_; + + // Mapping from canonical objref to list of object stores where the object is stored. Non-canonical (aliased) objrefs should not be used to index objtable_. ObjTable objtable_; std::mutex objtable_lock_; // Hash map from function names to workers where the function is registered. @@ -94,6 +117,9 @@ private: // List of pending pull calls. std::vector > pull_queue_; std::mutex pull_queue_lock_; + // List of pending alias notifications. Each element consists of (objstoreid, (alias_objref, canonical_objref)). + std::vector > > alias_notification_queue_; + std::mutex alias_notification_queue_lock_; }; #endif diff --git a/src/worker.cc b/src/worker.cc index 9b0ba87a0..b03b8ac3e 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -56,6 +56,7 @@ ObjRef Worker::get_objref() { } slice Worker::get_object(ObjRef objref) { + // get_object assumes that objref is a canonical objref ObjRequest request; request.workerid = workerid_; request.type = ObjRequestType::GET; @@ -83,7 +84,7 @@ void Worker::put_object(ObjRef objref, const Obj* obj) { receive_obj_queue_.receive(&result); uint8_t* target = segmentpool_.get_address(result); std::memcpy(target, &data[0], data.size()); - request.type = ObjRequestType::DONE; + request.type = ObjRequestType::WORKER_DONE; request.metadata_offset = 0; request_obj_queue_.send(&request); } @@ -99,7 +100,7 @@ void Worker::put_arrow(ObjRef objref, PyArrayObject* array) { ObjHandle result; receive_obj_queue_.receive(&result); store_arrow(array, result, &segmentpool_); - request.type = ObjRequestType::DONE; + request.type = ObjRequestType::WORKER_DONE; request.metadata_offset = result.metadata_offset(); request_obj_queue_.send(&request); } @@ -126,6 +127,15 @@ bool Worker::is_arrow(ObjRef objref) { return result.metadata_offset() != 0; } +void Worker::alias_objrefs(ObjRef alias_objref, ObjRef target_objref) { + ClientContext context; + AliasObjRefsRequest request; + request.set_alias_objref(alias_objref); + request.set_target_objref(target_objref); + AckReply reply; + scheduler_stub_->AliasObjRefs(&context, request, &reply); +} + void Worker::register_function(const std::string& name, size_t num_return_vals) { ClientContext context; RegisterFunctionRequest request; diff --git a/src/worker.h b/src/worker.h index 5f5429949..f4bf43993 100644 --- a/src/worker.h +++ b/src/worker.h @@ -58,6 +58,8 @@ class Worker { PyArrayObject* get_arrow(ObjRef objref); // determine if the object stored in objref is an arrow object // TODO(pcm): more general mechanism for this? bool is_arrow(ObjRef objref); + // make `alias_objref` refer to the same object that `target_objref` refers to + void alias_objrefs(ObjRef alias_objref, ObjRef target_objref); // register function with scheduler void register_function(const std::string& name, size_t num_return_vals); // start the worker server which accepts tasks from the scheduler and stores diff --git a/test/runtest.py b/test/runtest.py index 284155fca..ce8b1f5ea 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -140,21 +140,20 @@ class WorkerTest(unittest.TestCase): services.cleanup() -""" class APITest(unittest.TestCase): def testObjRefAliasing(self): - services.start_scheduler(address(IP_ADDRESS, new_scheduler_port())) - time.sleep(0.1) - services.start_objstore(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, new_objstore_port())) - time.sleep(0.2) - worker1 = worker.Worker() - orchpy.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, objstore_port), address(IP_ADDRESS, new_worker_port()), worker1) + w = worker.Worker() test_dir = os.path.dirname(os.path.abspath(__file__)) test_path = os.path.join(test_dir, "testrecv.py") - services.start_worker(test_path, address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, objstore_port), address(IP_ADDRESS, new_worker_port())) -""" + services.start_cluster(num_workers=3, worker_path=test_path, driver_worker=w) + objref = w.remote_call("__main__.test_alias_f", []) + self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) + objref = w.remote_call("__main__.test_alias_g", []) + self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) + objref = w.remote_call("__main__.test_alias_h", []) + self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5]))) if __name__ == '__main__': unittest.main() diff --git a/test/shell.py b/test/shell.py index cd77e4e17..ea42ba121 100644 --- a/test/shell.py +++ b/test/shell.py @@ -1,4 +1,5 @@ import argparse +import numpy as np import orchpy import orchpy.services as services @@ -18,6 +19,18 @@ parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") parser.add_argument("--worker-address", default="127.0.0.1:30001", type=str, help="the worker's address") +@orchpy.distributed([], [np.ndarray]) +def test_alias_f(): + return np.ones([3, 4, 5]) + +@orchpy.distributed([], [np.ndarray]) +def test_alias_g(): + return test_alias_f() + +@orchpy.distributed([], [np.ndarray]) +def test_alias_h(): + return test_alias_g() + @orchpy.distributed([str], [str]) def print_string(string): print "called print_string with", string diff --git a/test/testrecv.py b/test/testrecv.py index 3bac951d0..6cfe36262 100644 --- a/test/testrecv.py +++ b/test/testrecv.py @@ -20,8 +20,11 @@ def test_alias_f(): @orchpy.distributed([], [np.ndarray]) def test_alias_g(): - return f() + return test_alias_f() +@orchpy.distributed([], [np.ndarray]) +def test_alias_h(): + return test_alias_g() @orchpy.distributed([str], [str]) def print_string(string):