Merge pull request #4 from amplab/test

Test
This commit is contained in:
Philipp Moritz 2016-03-09 11:41:58 -08:00
commit b5a210b715
5 changed files with 121 additions and 128 deletions

View file

@ -9,24 +9,37 @@ all_processes = []
def cleanup():
global all_processes
timeout_sec = 5
for p in all_processes:
p_sec = 0
for second in range(timeout_sec):
if p.poll() == None:
time.sleep(0.1)
p_sec += 1
if p_sec >= timeout_sec:
p.kill() # supported from python 2.6
print 'helper processes shut down!'
for p, port in all_processes:
if p.poll() is not None: # process has already terminated
print "Process at port " + str(port) + " has already terminated."
continue
print "Attempting to kill process at port " + str(port) + "."
p.kill()
time.sleep(0.05) # is this necessary?
if p.poll() is not None:
print "Successfully killed process at port " + str(port) + "."
continue
print "Kill attempt failed, attempting to terminate process at port " + str(port) + "."
p.terminate()
time.sleep(0.05) # is this necessary?
if p.poll is not None:
print "Successfully terminated process at port " + str(port) + "."
continue
print "Termination attempt failed, giving up."
all_processes = []
atexit.register(cleanup)
def start_scheduler(scheduler_address):
def start_scheduler(host, port):
scheduler_address = host + ":" + str(port)
p = subprocess.Popen([os.path.join(_services_path, "scheduler"), str(scheduler_address)])
all_processes.append(p)
all_processes.append((p, port))
def start_objstore(objstore_address):
def start_objstore(host, port):
objstore_address = host + ":" + str(port)
p = subprocess.Popen([os.path.join(_services_path, "objstore"), str(objstore_address)])
all_processes.append(p)
all_processes.append((p, port))
def start_worker(test_path, host, scheduler_port, worker_port, objstore_port):
p = subprocess.Popen(["python", test_path, host, str(scheduler_port), str(worker_port), str(objstore_port)])
all_processes.append((p, worker_port))

View file

@ -269,8 +269,8 @@ cdef class Worker:
print "after get data"
return unison.deserialize_from_string(data)
cpdef register_function(self, func_name, function, num_args):
orch_register_function(self.context, func_name, num_args)
cpdef register_function(self, func_name, function, num_return_vals):
orch_register_function(self.context, func_name, num_return_vals)
self.functions[func_name] = function
cpdef wait_for_next_task(self):
@ -314,7 +314,7 @@ def distributed(types, return_types, worker=global_worker):
if i < len(types) - 1:
arguments.append(worker.pull(arg))
elif i == len(types) - 1 and types[-1] is not None:
arguments.append(global_worker.pull(arg))
arguments.append(worker.pull(arg))
elif types[-1] is None:
arguments.append(worker.pull(arg))
else:

View file

@ -13,37 +13,9 @@ from grpc.beta import implementations
import orchestra_pb2
import types_pb2
"""
class UnisonTest(unittest.TestCase):
def testSerialize(self):
d = [1, 2L, "hello", 3.0]
res = unison.serialize_args(d)
c = unison.deserialize_args(res)
self.assertEqual(c, d)
d = [{'hello': 'world'}]
res = unison.serialize_args(d)
c = unison.deserialize_args(res)
self.assertEqual(c, d)
a = np.zeros((100, 100))
res = unison.serialize_args(a)
b = unison.deserialize_args(res)
self.assertTrue((a == b).all())
a = [unison.ObjRef(42, int)]
res = unison.serialize_args(a)
b = unison.deserialize_args(res)
self.assertEqual(a, b)
"""
IP_ADDRESS = "127.0.0.1"
TIMEOUT_SECONDS = 5
def produce_data(num_chunks):
for _ in range(num_chunks):
yield orchestra_pb2.ObjChunk(objref=1, totalsize=1000, data=b"hello world")
def connect_to_scheduler(host, port):
channel = implementations.insecure_channel(host, port)
return orchestra_pb2.beta_create_Scheduler_stub(channel)
@ -52,96 +24,104 @@ def connect_to_objstore(host, port):
channel = implementations.insecure_channel(host, port)
return orchestra_pb2.beta_create_ObjStore_stub(channel)
def address(host, port):
return host + ":" + str(port)
scheduler_port_counter = 0
def new_scheduler_port():
global scheduler_port_counter
scheduler_port_counter += 1
return 10000 + scheduler_port_counter
worker_port_counter = 0
def new_worker_port():
global worker_port_counter
worker_port_counter += 1
return 40000 + worker_port_counter
objstore_port_counter = 0
def new_objstore_port():
global objstore_port_counter
objstore_port_counter += 1
return 20000 + objstore_port_counter
class ObjStoreTest(unittest.TestCase):
"""Test setting up object stores, transfering data between them and retrieving data to a client"""
def testObjStore(self):
services.start_scheduler("0.0.0.0:22221")
services.start_objstore("0.0.0.0:22222")
services.start_objstore("0.0.0.0:22223")
scheduler_port = new_scheduler_port()
objstore1_port = new_objstore_port()
objstore2_port = new_objstore_port()
worker1_port = new_worker_port()
worker2_port = new_worker_port()
services.start_scheduler(IP_ADDRESS, scheduler_port)
services.start_objstore(IP_ADDRESS, objstore1_port)
services.start_objstore(IP_ADDRESS, objstore2_port)
time.sleep(0.2)
scheduler_stub = connect_to_scheduler('localhost', 22221)
objstore1_stub = connect_to_objstore('localhost', 22222)
objstore2_stub = connect_to_objstore('localhost', 22223)
scheduler_stub = connect_to_scheduler(IP_ADDRESS, scheduler_port)
objstore1_stub = connect_to_objstore(IP_ADDRESS, objstore1_port)
objstore2_stub = connect_to_objstore(IP_ADDRESS, objstore2_port)
worker1 = worker.Worker()
worker1.connect("127.0.0.1:22221", "127.0.0.1:40000", "127.0.0.1:22222")
worker1.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker1_port), address(IP_ADDRESS, objstore1_port))
worker2 = worker.Worker()
worker2.connect("127.0.0.1:22221", "127.0.0.1:40001", "127.0.0.1:22223")
worker2.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker2_port), address(IP_ADDRESS, objstore2_port))
for i in range(1, 100):
l = i * 100 * "h"
objref = worker1.push(l)
response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address="0.0.0.0:22223"), TIMEOUT_SECONDS)
s = worker2.get_serialized(objref.get_id())
result = worker.unison.deserialize_from_string(s)
# result = worker1.pull(objref)
self.assertEqual(len(result), 100 * i)
# pushing and pulling an object shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0]:
objref = worker1.push(data)
result = worker1.pull(objref)
self.assertEqual(result, data)
# pushing an object, shipping it to another worker, and pulling it shouldn't change it
for data in ["h", "h" * 10000, 0, 0.0]:
objref = worker1.push(data)
response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS)
result = worker2.pull(objref)
self.assertEqual(result, data)
services.cleanup()
class SchedulerTest(unittest.TestCase):
def testCall(self):
services.start_scheduler("0.0.0.0:22221")
services.start_objstore("0.0.0.0:22222")
scheduler_port = new_scheduler_port()
objstore_port = new_objstore_port()
worker1_port = new_worker_port()
worker2_port = new_worker_port()
services.start_scheduler(IP_ADDRESS, scheduler_port)
services.start_objstore(IP_ADDRESS, objstore_port)
time.sleep(0.2)
scheduler_stub = connect_to_scheduler('localhost', 22221)
objstore_stub = connect_to_objstore('localhost', 22222)
scheduler_stub = connect_to_scheduler(IP_ADDRESS, scheduler_port)
objstore_stub = connect_to_objstore(IP_ADDRESS, objstore_port)
time.sleep(0.2)
w = worker.Worker()
w.connect("127.0.0.1:22221", "127.0.0.1:40003", "127.0.0.1:22222")
w.start_worker_service()
w2 = worker.Worker()
w2.connect("127.0.0.1:22221", "127.0.0.1:40004", "127.0.0.1:22222")
w2.start_worker_service()
worker1 = worker.Worker()
worker1.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker1_port), address(IP_ADDRESS, objstore_port))
worker1.start_worker_service()
test_dir = os.path.dirname(os.path.abspath(__file__))
test_path = os.path.join(test_dir, "testrecv.py")
services.start_worker(test_path, IP_ADDRESS, scheduler_port, worker2_port, objstore_port)
time.sleep(0.2)
w.register_function("hello_world", None, 2)
w2.register_function("hello_world", None, 2)
time.sleep(0.1)
w.call("hello_world", ["hi"])
worker1.call("hello_world", ["hi"])
time.sleep(0.1)
reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(), TIMEOUT_SECONDS)
# self.assertEqual(reply.task[0].name, u'hello_world') # doesn't currently work, because scheduler is now invoked on every interaction
test_path = os.path.dirname(os.path.abspath(__file__))
p = subprocess.Popen(["python", os.path.join(test_path, "testrecv.py")])
time.sleep(0.2)
w.call("hello_world", ["hi"])
w.call("hello_world", ["hi"])
w.call("hello_world", ["hi"])
scheduler_stub.PushObj(orchestra_pb2.PushObjRequest(workerid=0), TIMEOUT_SECONDS)
# self.assertEqual(p.wait(), 0, "argument was not received by the test program") # todo: reactivate
# w.main_loop()
# w2.main_loop()
#
# reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(do_scheduling=True), TIMEOUT_SECONDS)
# time.sleep(0.1)
# reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(), TIMEOUT_SECONDS)
#
# self.assertEqual(list(reply.task), [])
#
# services.cleanup()
services.cleanup()
if __name__ == '__main__':
unittest.main()

View file

@ -1,21 +0,0 @@
import orchpy.unison as unison
import orchpy.services as services
import orchpy.worker as worker
@worker.distributed([str], [str])
def print_string(string):
print "called print_string with", string
return string
@worker.distributed([int, int], [int, int])
def handle_int(a, b):
return a+1, b+1
if __name__ == '__main__':
worker.global_worker.connect("127.0.0.1:22221", "127.0.0.1:40000", "127.0.0.1:22222")
worker.global_worker.register_function("hello_world", print_string, 1)
worker.global_worker.register_function("handle_int", handle_int, 2)
worker.global_worker.start_worker_service()
worker.global_worker.main_loop()

View file

@ -1,13 +1,34 @@
import sys
import orchpy.unison as unison
import orchpy.services as services
import orchpy.worker as worker
import sys
@worker.distributed([str], [str])
def print_string(string):
print "called print_string with", string
return string
@worker.distributed([int, int], [int, int])
def handle_int(a, b):
return a + 1, b + 1
if __name__ == '__main__':
ip_address = sys.argv[1]
scheduler_port = sys.argv[2]
worker_port = sys.argv[3]
objstore_port = sys.argv[4]
def address(host, port):
return host + ":" + str(port)
worker = worker.Worker()
worker.connect("127.0.0.1:22221", "127.0.0.1:50000", "127.0.0.1:22222")
worker.connect(address(ip_address, scheduler_port), address(ip_address, worker_port), address(ip_address, objstore_port))
worker.start_worker_service()
worker.register_function("hello_world", None, 0)
worker.register_function("print_string", print_string, 0)
worker.register_function("handle_int", handle_int, 0)
name, args, returnref = worker.wait_for_next_task()
print "received args ", args
if args == ["hi"]: