make it possible to start multiple nodes in a cluster; add some more logging

This commit is contained in:
Philipp Moritz 2016-04-28 15:31:31 -07:00
parent d45ea8a518
commit d3f62c4288
7 changed files with 88 additions and 21 deletions

View file

@ -81,7 +81,26 @@ def start_worker(test_path, scheduler_address, objstore_address, worker_address)
"--worker-address=" + worker_address])
all_processes.append((p, worker_address))
def start_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None):
def start_node(scheduler_address, node_ip_address, num_workers, worker_path=None):
"""
Start an object store and associated workers that will be part of a larger cluster.
Assumes the scheduler has already been started.
:param scheduler_address: ip address and port of the scheduler (which may run on a different node)
:param node_ip_address: ip address (without port) of the node this function is run on
:param num_workers: the number of workers to be started on this node
:worker_path: path of the source code that will be run on the worker
"""
objstore_address = address(node_ip_address, new_objstore_port())
start_objstore(scheduler_address, objstore_address)
time.sleep(0.2)
for _ in range(num_workers):
start_worker(worker_path, scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
time.sleep(0.3)
orchpy.connect(scheduler_address, objstore_address, address(node_ip_address, new_worker_port()))
time.sleep(0.5)
def start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=0, worker_path=None):
global drivers
if num_workers_per_objstore > 0 and worker_path is None:
raise Exception("Attempting to start a cluster with {} workers per object store, but `worker_path` is None.".format(num_workers_per_objstore))

View file

@ -1,5 +1,7 @@
#include "objstore.h"
#include <chrono>
#include "utils.h"
const size_t ObjStoreService::CHUNK_SIZE = 8 * 1024;
@ -262,14 +264,17 @@ void ObjStoreService::process_requests() {
recv_queue_.receive(&request);
switch (request.type) {
case ObjRequestType::ALLOC: {
ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Allocate object with objref " << request.objref << " and size " << request.size);
process_worker_request(request);
}
break;
case ObjRequestType::GET: {
ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Get object with objref " << request.objref);
process_worker_request(request);
}
break;
case ObjRequestType::WORKER_DONE: {
ORCH_LOG(ORCH_VERBOSE, "Request (worker " << request.workerid << " to objstore " << objstoreid_ << "): Finalize object with objref " << request.objref);
process_worker_request(request);
}
break;
@ -304,6 +309,7 @@ ObjHandle ObjStoreService::alloc(ObjRef objref, size_t size) {
ObjHandle handle = segmentpool_->allocate(size);
segmentpool_lock_.unlock();
std::lock_guard<std::mutex> memory_lock(memory_lock_);
ORCH_LOG(ORCH_VERBOSE, "Allocating space for objref " << objref << " on object store " << objstoreid_);
if (memory_[objref].second != MemoryStatusType::NOT_PRESENT && memory_[objref].second != MemoryStatusType::PRE_ALLOCED) {
ORCH_LOG(ORCH_FATAL, "Attempting to allocate space for objref " << objref << ", but memory_[objref].second = " << memory_[objref].second);
}
@ -346,8 +352,11 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) {
std::string objstore_address(objstore_addr);
ObjStoreService service(objstore_address, scheduler_channel);
service.start_objstore_service();
std::string::iterator split_point = split_ip_address(objstore_address);
std::string port;
port.assign(split_point, objstore_address.end());
ServerBuilder builder;
builder.AddListeningPort(std::string(objstore_addr), grpc::InsecureServerCredentials());
builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
@ -356,6 +365,7 @@ void start_objstore(const char* scheduler_addr, const char* objstore_addr) {
int main(int argc, char** argv) {
if (argc != 3) {
ORCH_LOG(ORCH_FATAL, "object store: expected two arguments (scheduler ip address and object store ip address)");
return 1;
}

View file

@ -1,8 +1,10 @@
#include "scheduler.h"
#include <random>
#include <thread>
#include <chrono>
#include "scheduler.h"
#include "utils.h"
Status SchedulerService::RemoteCall(ServerContext* context, const RemoteCallRequest* request, RemoteCallReply* reply) {
std::unique_ptr<Call> task(new Call(request->call())); // need to copy, because request is const
@ -661,18 +663,24 @@ void SchedulerService::get_equivalent_objrefs(ObjRef objref, std::vector<ObjRef>
upstream_objrefs(downstream_objref, equivalent_objrefs);
}
void start_scheduler_service(const char* server_address) {
void start_scheduler_service(const char* service_addr) {
std::string service_address(service_addr);
std::string::iterator split_point = split_ip_address(service_address);
std::string port;
port.assign(split_point, service_address.end());
SchedulerService service;
ServerBuilder builder;
builder.AddListeningPort(std::string(server_address), grpc::InsecureServerCredentials());
builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
server->Wait();
}
int main(int argc, char** argv) {
if (argc != 2)
if (argc != 2) {
ORCH_LOG(ORCH_FATAL, "scheduler: expected one argument (scheduler ip address)");
return 1;
}
start_scheduler_service(argv[1]);
return 0;
}

24
src/utils.h Normal file
View file

@ -0,0 +1,24 @@
#ifndef ORCHESTRA_UTILS_H
#define ORCHESTRA_UTILS_H
std::string::iterator split_ip_address(std::string& ip_address) {
if (ip_address[0] == '[') { // IPv6
auto split_end = std::find(ip_address.begin() + 1, ip_address.end(), ']');
if(split_end != ip_address.end()) {
split_end++;
}
if(split_end != ip_address.end() && *split_end == ':') {
return split_end;
}
ORCH_LOG(ORCH_FATAL, "ip address should contain a port number");
} else { // IPv4
auto split_point = std::find(ip_address.rbegin(), ip_address.rend(), ':').base();
if (split_point == ip_address.begin()) {
ORCH_LOG(ORCH_FATAL, "ip address should contain a port number");
} else {
return split_point;
}
}
}
#endif

View file

@ -1,5 +1,7 @@
#include "worker.h"
#include "utils.h"
#include <pynumbuf/serialize.h>
extern "C" {
@ -289,14 +291,18 @@ void Worker::scheduler_info(ClientContext &context, SchedulerInfoRequest &reques
// (in our case running in the main thread), whereas the WorkerService will
// run in a separate thread and potentially utilize multiple threads.
void Worker::start_worker_service() {
const char* server_address = worker_address_.c_str();
worker_server_thread_ = std::thread([server_address]() {
WorkerServiceImpl service(server_address);
const char* service_addr = worker_address_.c_str();
worker_server_thread_ = std::thread([service_addr]() {
std::string service_address(service_addr);
std::string::iterator split_point = split_ip_address(service_address);
std::string port;
port.assign(split_point, service_address.end());
WorkerServiceImpl service(service_address);
ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
builder.AddListeningPort(std::string("0.0.0.0:") + port, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
ORCH_LOG(ORCH_INFO, "worker server listening on " << server_address);
ORCH_LOG(ORCH_INFO, "worker server listening on " << service_address);
server->Wait();
});
}

View file

@ -22,7 +22,7 @@ class ArraysSingleTest(unittest.TestCase):
def testMethods(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path)
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path)
# test eye
ref = single.eye(3, "float")
@ -54,7 +54,7 @@ class ArraysSingleTest(unittest.TestCase):
class ArraysDistTest(unittest.TestCase):
def testSerialization(self):
[w] = services.start_cluster(return_drivers=True)
[w] = services.start_singlenode_cluster(return_drivers=True)
x = dist.DistArray()
x.construct([2, 3, 4], np.array([[[orchpy.push(0, w)]]]))
@ -68,7 +68,7 @@ class ArraysDistTest(unittest.TestCase):
def testAssemble(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path)
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=1, worker_path=test_path)
a = single.ones([dist.BLOCK_SIZE, dist.BLOCK_SIZE], "float")
b = single.zeros([dist.BLOCK_SIZE, dist.BLOCK_SIZE], "float")
@ -81,7 +81,7 @@ class ArraysDistTest(unittest.TestCase):
def testMethods(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=8, worker_path=test_path)
services.start_singlenode_cluster(return_drivers=False, num_objstores=2, num_workers_per_objstore=8, worker_path=test_path)
x = dist.zeros([9, 25, 51], "float")
y = dist.assemble(x)

View file

@ -31,7 +31,7 @@ class SerializationTest(unittest.TestCase):
self.assertTrue((a == c).all())
def testSerialize(self):
[w] = services.start_cluster(return_drivers=True)
[w] = services.start_singlenode_cluster(return_drivers=True)
self.roundTripTest(w, [1, "hello", 3.0])
self.roundTripTest(w, 42)
@ -69,7 +69,7 @@ class ObjStoreTest(unittest.TestCase):
# Test setting up object stores, transfering data between them and retrieving data to a client
def testObjStore(self):
[w1, w2] = services.start_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0)
[w1, w2] = services.start_singlenode_cluster(return_drivers=True, num_objstores=2, num_workers_per_objstore=0)
# pushing and pulling an object shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0]:
@ -120,7 +120,7 @@ class SchedulerTest(unittest.TestCase):
def testCall(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
[w] = services.start_cluster(return_drivers=True, num_workers_per_objstore=1, worker_path=test_path)
[w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=1, worker_path=test_path)
value_before = "test_string"
objref = w.remote_call("test_functions.print_string", [value_before])
@ -137,7 +137,7 @@ class SchedulerTest(unittest.TestCase):
class WorkerTest(unittest.TestCase):
def testPushPull(self):
[w] = services.start_cluster(return_drivers=True)
[w] = services.start_singlenode_cluster(return_drivers=True)
for i in range(100):
value_before = i * 10 ** 6
@ -170,7 +170,7 @@ class APITest(unittest.TestCase):
def testObjRefAliasing(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
[w] = services.start_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path)
[w] = services.start_singlenode_cluster(return_drivers=True, num_workers_per_objstore=3, worker_path=test_path)
objref = w.remote_call("test_functions.test_alias_f", [])
self.assertTrue(np.alltrue(orchpy.pull(objref[0], w) == np.ones([3, 4, 5])))
@ -186,7 +186,7 @@ class ReferenceCountingTest(unittest.TestCase):
def testDeallocation(self):
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path)
services.start_singlenode_cluster(return_drivers=False, num_workers_per_objstore=3, worker_path=test_path)
x = test_functions.test_alias_f()
orchpy.pull(x)