From 70e62313f08a816284aae31b799767c01296acb8 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 8 Mar 2016 16:14:02 -0800 Subject: [PATCH 1/2] cleaning up tests --- lib/orchpy/orchpy/services.py | 12 +++- test/runtest.py | 124 +++++++++++++++++----------------- test/testprogram.py | 12 +++- test/testrecv.py | 13 +++- 4 files changed, 92 insertions(+), 69 deletions(-) diff --git a/lib/orchpy/orchpy/services.py b/lib/orchpy/orchpy/services.py index 17cb368ca..c71d326f0 100644 --- a/lib/orchpy/orchpy/services.py +++ b/lib/orchpy/orchpy/services.py @@ -18,15 +18,21 @@ def cleanup(): p_sec += 1 if p_sec >= timeout_sec: p.kill() # supported from python 2.6 - print 'helper processes shut down!' + print "helper processes shut down!" all_processes = [] atexit.register(cleanup) -def start_scheduler(scheduler_address): +def start_scheduler(host, port): + scheduler_address = host + ":" + str(port) p = subprocess.Popen([os.path.join(_services_path, "scheduler"), str(scheduler_address)]) all_processes.append(p) -def start_objstore(objstore_address): +def start_objstore(host, port): + objstore_address = host + ":" + str(port) p = subprocess.Popen([os.path.join(_services_path, "objstore"), str(objstore_address)]) all_processes.append(p) + +def start_worker(test_path, host, scheduler_port, worker_port, objstore_port): + p = subprocess.Popen(["python", os.path.join(test_path, "testrecv.py"), host, str(scheduler_port), str(worker_port), str(objstore_port)]) + all_processes.append(p) diff --git a/test/runtest.py b/test/runtest.py index 56cf0fab0..e36653a07 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -13,31 +13,7 @@ from grpc.beta import implementations import orchestra_pb2 import types_pb2 -""" -class UnisonTest(unittest.TestCase): - - def testSerialize(self): - d = [1, 2L, "hello", 3.0] - res = unison.serialize_args(d) - c = unison.deserialize_args(res) - self.assertEqual(c, d) - - d = [{'hello': 'world'}] - res = unison.serialize_args(d) - c = unison.deserialize_args(res) - self.assertEqual(c, d) - - a = np.zeros((100, 100)) - res = unison.serialize_args(a) - b = unison.deserialize_args(res) - self.assertTrue((a == b).all()) - - a = [unison.ObjRef(42, int)] - res = unison.serialize_args(a) - b = unison.deserialize_args(res) - self.assertEqual(a, b) -""" - +IP_ADDRESS = "127.0.0.1" TIMEOUT_SECONDS = 5 def produce_data(num_chunks): @@ -52,30 +28,58 @@ def connect_to_objstore(host, port): channel = implementations.insecure_channel(host, port) return orchestra_pb2.beta_create_ObjStore_stub(channel) +def address(host, port): + return host + ":" + str(port) + +scheduler_port_counter = 0 +def new_scheduler_port(): + global scheduler_port_counter + scheduler_port_counter += 1 + return 10000 + scheduler_port_counter + +worker_port_counter = 0 +def new_worker_port(): + global worker_port_counter + worker_port_counter += 1 + return 40000 + worker_port_counter + +objstore_port_counter = 0 +def new_objstore_port(): + global objstore_port_counter + objstore_port_counter += 1 + return 20000 + objstore_port_counter + + class ObjStoreTest(unittest.TestCase): """Test setting up object stores, transfering data between them and retrieving data to a client""" def testObjStore(self): - services.start_scheduler("0.0.0.0:22221") - services.start_objstore("0.0.0.0:22222") - services.start_objstore("0.0.0.0:22223") + scheduler_port = new_scheduler_port() + objstore1_port = new_objstore_port() + objstore2_port = new_objstore_port() + worker1_port = new_worker_port() + worker2_port = new_worker_port() + + services.start_scheduler(IP_ADDRESS, scheduler_port) + services.start_objstore(IP_ADDRESS, objstore1_port) + services.start_objstore(IP_ADDRESS, objstore2_port) time.sleep(0.2) - scheduler_stub = connect_to_scheduler('localhost', 22221) - objstore1_stub = connect_to_objstore('localhost', 22222) - objstore2_stub = connect_to_objstore('localhost', 22223) + scheduler_stub = connect_to_scheduler(IP_ADDRESS, scheduler_port) + objstore1_stub = connect_to_objstore(IP_ADDRESS, objstore1_port) + objstore2_stub = connect_to_objstore(IP_ADDRESS, objstore2_port) worker1 = worker.Worker() - worker1.connect("127.0.0.1:22221", "127.0.0.1:40000", "127.0.0.1:22222") + worker1.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker1_port), address(IP_ADDRESS, objstore1_port)) worker2 = worker.Worker() - worker2.connect("127.0.0.1:22221", "127.0.0.1:40001", "127.0.0.1:22223") + worker2.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker2_port), address(IP_ADDRESS, objstore2_port)) for i in range(1, 100): l = i * 100 * "h" objref = worker1.push(l) - response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address="0.0.0.0:22223"), TIMEOUT_SECONDS) + response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) s = worker2.get_serialized(objref.get_id()) result = worker.unison.deserialize_from_string(s) # result = worker1.pull(objref) @@ -86,31 +90,37 @@ class ObjStoreTest(unittest.TestCase): class SchedulerTest(unittest.TestCase): def testCall(self): - services.start_scheduler("0.0.0.0:22221") - services.start_objstore("0.0.0.0:22222") + scheduler_port = new_scheduler_port() + objstore_port = new_objstore_port() + worker1_port = new_worker_port() + worker2_port = new_worker_port() + worker3_port = new_worker_port() + + services.start_scheduler(IP_ADDRESS, scheduler_port) + services.start_objstore(IP_ADDRESS, objstore_port) time.sleep(0.2) - scheduler_stub = connect_to_scheduler('localhost', 22221) - objstore_stub = connect_to_objstore('localhost', 22222) + scheduler_stub = connect_to_scheduler(IP_ADDRESS, scheduler_port) + objstore_stub = connect_to_objstore(IP_ADDRESS, objstore_port) time.sleep(0.2) - w = worker.Worker() - w.connect("127.0.0.1:22221", "127.0.0.1:40003", "127.0.0.1:22222") - w.start_worker_service() - w2 = worker.Worker() - w2.connect("127.0.0.1:22221", "127.0.0.1:40004", "127.0.0.1:22222") - w2.start_worker_service() + worker1 = worker.Worker() + worker1.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker1_port), address(IP_ADDRESS, objstore_port)) + worker1.start_worker_service() + worker2 = worker.Worker() + worker2.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker2_port), address(IP_ADDRESS, objstore_port)) + worker2.start_worker_service() time.sleep(0.2) - w.register_function("hello_world", None, 2) - w2.register_function("hello_world", None, 2) + worker1.register_function("hello_world", None, 2) + worker2.register_function("hello_world", None, 2) time.sleep(0.1) - w.call("hello_world", ["hi"]) + worker1.call("hello_world", ["hi"]) time.sleep(0.1) @@ -119,29 +129,17 @@ class SchedulerTest(unittest.TestCase): # self.assertEqual(reply.task[0].name, u'hello_world') # doesn't currently work, because scheduler is now invoked on every interaction test_path = os.path.dirname(os.path.abspath(__file__)) - - p = subprocess.Popen(["python", os.path.join(test_path, "testrecv.py")]) + services.start_worker(test_path, IP_ADDRESS, scheduler_port, worker3_port, objstore_port) time.sleep(0.2) - w.call("hello_world", ["hi"]) - w.call("hello_world", ["hi"]) - w.call("hello_world", ["hi"]) + worker1.call("hello_world", ["hi"]) + worker1.call("hello_world", ["hi"]) + worker1.call("hello_world", ["hi"]) scheduler_stub.PushObj(orchestra_pb2.PushObjRequest(workerid=0), TIMEOUT_SECONDS) # self.assertEqual(p.wait(), 0, "argument was not received by the test program") # todo: reactivate - # w.main_loop() - # w2.main_loop() - # - # reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(do_scheduling=True), TIMEOUT_SECONDS) - # time.sleep(0.1) - # reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(), TIMEOUT_SECONDS) - # - # self.assertEqual(list(reply.task), []) - # - # services.cleanup() - if __name__ == '__main__': unittest.main() diff --git a/test/testprogram.py b/test/testprogram.py index 5d35e0f16..c97953c60 100644 --- a/test/testprogram.py +++ b/test/testprogram.py @@ -1,3 +1,5 @@ +import sys + import orchpy.unison as unison import orchpy.services as services import orchpy.worker as worker @@ -11,8 +13,16 @@ def print_string(string): def handle_int(a, b): return a+1, b+1 +def address(host, port): + return host + ":" + str(port) + if __name__ == '__main__': - worker.global_worker.connect("127.0.0.1:22221", "127.0.0.1:40000", "127.0.0.1:22222") + ip_address = sys.argv[1] + scheduler_port = sys.argv[2] + worker_port = sys.argv[3] + objstore_port = sys.argv[4] + + worker.global_worker.connect(address(ip_address, scheduler_port), address(ip_address, worker_port), address(ip_address, objstore_port)) worker.global_worker.register_function("hello_world", print_string, 1) worker.global_worker.register_function("handle_int", handle_int, 2) diff --git a/test/testrecv.py b/test/testrecv.py index 68295916e..e58984ea5 100644 --- a/test/testrecv.py +++ b/test/testrecv.py @@ -1,11 +1,20 @@ +import sys + import orchpy.unison as unison import orchpy.services as services import orchpy.worker as worker -import sys if __name__ == '__main__': + ip_address = sys.argv[1] + scheduler_port = sys.argv[2] + worker_port = sys.argv[3] + objstore_port = sys.argv[4] + + def address(host, port): + return host + ":" + str(port) + worker = worker.Worker() - worker.connect("127.0.0.1:22221", "127.0.0.1:50000", "127.0.0.1:22222") + worker.connect(address(ip_address, scheduler_port), address(ip_address, worker_port), address(ip_address, objstore_port)) worker.start_worker_service() worker.register_function("hello_world", None, 0) name, args, returnref = worker.wait_for_next_task() From b9d6cc8de4a32a7d0371bda050c21340632c4674 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Wed, 9 Mar 2016 11:40:36 -0800 Subject: [PATCH 2/2] changing tests --- lib/orchpy/orchpy/services.py | 35 +++++++++++++---------- lib/orchpy/orchpy/worker.pyx | 6 ++-- test/runtest.py | 52 ++++++++++++----------------------- test/testprogram.py | 31 --------------------- test/testrecv.py | 14 +++++++++- 5 files changed, 54 insertions(+), 84 deletions(-) delete mode 100644 test/testprogram.py diff --git a/lib/orchpy/orchpy/services.py b/lib/orchpy/orchpy/services.py index c71d326f0..5465989d2 100644 --- a/lib/orchpy/orchpy/services.py +++ b/lib/orchpy/orchpy/services.py @@ -9,16 +9,23 @@ all_processes = [] def cleanup(): global all_processes - timeout_sec = 5 - for p in all_processes: - p_sec = 0 - for second in range(timeout_sec): - if p.poll() == None: - time.sleep(0.1) - p_sec += 1 - if p_sec >= timeout_sec: - p.kill() # supported from python 2.6 - print "helper processes shut down!" + for p, port in all_processes: + if p.poll() is not None: # process has already terminated + print "Process at port " + str(port) + " has already terminated." + continue + print "Attempting to kill process at port " + str(port) + "." + p.kill() + time.sleep(0.05) # is this necessary? + if p.poll() is not None: + print "Successfully killed process at port " + str(port) + "." + continue + print "Kill attempt failed, attempting to terminate process at port " + str(port) + "." + p.terminate() + time.sleep(0.05) # is this necessary? + if p.poll is not None: + print "Successfully terminated process at port " + str(port) + "." + continue + print "Termination attempt failed, giving up." all_processes = [] atexit.register(cleanup) @@ -26,13 +33,13 @@ atexit.register(cleanup) def start_scheduler(host, port): scheduler_address = host + ":" + str(port) p = subprocess.Popen([os.path.join(_services_path, "scheduler"), str(scheduler_address)]) - all_processes.append(p) + all_processes.append((p, port)) def start_objstore(host, port): objstore_address = host + ":" + str(port) p = subprocess.Popen([os.path.join(_services_path, "objstore"), str(objstore_address)]) - all_processes.append(p) + all_processes.append((p, port)) def start_worker(test_path, host, scheduler_port, worker_port, objstore_port): - p = subprocess.Popen(["python", os.path.join(test_path, "testrecv.py"), host, str(scheduler_port), str(worker_port), str(objstore_port)]) - all_processes.append(p) + p = subprocess.Popen(["python", test_path, host, str(scheduler_port), str(worker_port), str(objstore_port)]) + all_processes.append((p, worker_port)) diff --git a/lib/orchpy/orchpy/worker.pyx b/lib/orchpy/orchpy/worker.pyx index d7633bbbe..146cabf8c 100644 --- a/lib/orchpy/orchpy/worker.pyx +++ b/lib/orchpy/orchpy/worker.pyx @@ -269,8 +269,8 @@ cdef class Worker: print "after get data" return unison.deserialize_from_string(data) - cpdef register_function(self, func_name, function, num_args): - orch_register_function(self.context, func_name, num_args) + cpdef register_function(self, func_name, function, num_return_vals): + orch_register_function(self.context, func_name, num_return_vals) self.functions[func_name] = function cpdef wait_for_next_task(self): @@ -314,7 +314,7 @@ def distributed(types, return_types, worker=global_worker): if i < len(types) - 1: arguments.append(worker.pull(arg)) elif i == len(types) - 1 and types[-1] is not None: - arguments.append(global_worker.pull(arg)) + arguments.append(worker.pull(arg)) elif types[-1] is None: arguments.append(worker.pull(arg)) else: diff --git a/test/runtest.py b/test/runtest.py index e36653a07..43f584dc8 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -16,10 +16,6 @@ import types_pb2 IP_ADDRESS = "127.0.0.1" TIMEOUT_SECONDS = 5 -def produce_data(num_chunks): - for _ in range(num_chunks): - yield orchestra_pb2.ObjChunk(objref=1, totalsize=1000, data=b"hello world") - def connect_to_scheduler(host, port): channel = implementations.insecure_channel(host, port) return orchestra_pb2.beta_create_Scheduler_stub(channel) @@ -76,14 +72,18 @@ class ObjStoreTest(unittest.TestCase): worker2 = worker.Worker() worker2.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker2_port), address(IP_ADDRESS, objstore2_port)) - for i in range(1, 100): - l = i * 100 * "h" - objref = worker1.push(l) - response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) - s = worker2.get_serialized(objref.get_id()) - result = worker.unison.deserialize_from_string(s) - # result = worker1.pull(objref) - self.assertEqual(len(result), 100 * i) + # pushing and pulling an object shouldn't change it + for data in ["h", "h" * 10000, 0, 0.0]: + objref = worker1.push(data) + result = worker1.pull(objref) + self.assertEqual(result, data) + + # pushing an object, shipping it to another worker, and pulling it shouldn't change it + for data in ["h", "h" * 10000, 0, 0.0]: + objref = worker1.push(data) + response = objstore1_stub.DeliverObj(orchestra_pb2.DeliverObjRequest(objref=objref.get_id(), objstore_address=address(IP_ADDRESS, objstore2_port)), TIMEOUT_SECONDS) + result = worker2.pull(objref) + self.assertEqual(result, data) services.cleanup() @@ -94,7 +94,6 @@ class SchedulerTest(unittest.TestCase): objstore_port = new_objstore_port() worker1_port = new_worker_port() worker2_port = new_worker_port() - worker3_port = new_worker_port() services.start_scheduler(IP_ADDRESS, scheduler_port) services.start_objstore(IP_ADDRESS, objstore_port) @@ -109,37 +108,20 @@ class SchedulerTest(unittest.TestCase): worker1 = worker.Worker() worker1.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker1_port), address(IP_ADDRESS, objstore_port)) worker1.start_worker_service() - worker2 = worker.Worker() - worker2.connect(address(IP_ADDRESS, scheduler_port), address(IP_ADDRESS, worker2_port), address(IP_ADDRESS, objstore_port)) - worker2.start_worker_service() + + test_dir = os.path.dirname(os.path.abspath(__file__)) + test_path = os.path.join(test_dir, "testrecv.py") + services.start_worker(test_path, IP_ADDRESS, scheduler_port, worker2_port, objstore_port) time.sleep(0.2) - worker1.register_function("hello_world", None, 2) - worker2.register_function("hello_world", None, 2) - - time.sleep(0.1) - worker1.call("hello_world", ["hi"]) time.sleep(0.1) reply = scheduler_stub.GetDebugInfo(orchestra_pb2.GetDebugInfoRequest(), TIMEOUT_SECONDS) - # self.assertEqual(reply.task[0].name, u'hello_world') # doesn't currently work, because scheduler is now invoked on every interaction - - test_path = os.path.dirname(os.path.abspath(__file__)) - services.start_worker(test_path, IP_ADDRESS, scheduler_port, worker3_port, objstore_port) - - time.sleep(0.2) - - worker1.call("hello_world", ["hi"]) - worker1.call("hello_world", ["hi"]) - worker1.call("hello_world", ["hi"]) - - scheduler_stub.PushObj(orchestra_pb2.PushObjRequest(workerid=0), TIMEOUT_SECONDS) - - # self.assertEqual(p.wait(), 0, "argument was not received by the test program") # todo: reactivate + services.cleanup() if __name__ == '__main__': unittest.main() diff --git a/test/testprogram.py b/test/testprogram.py deleted file mode 100644 index c97953c60..000000000 --- a/test/testprogram.py +++ /dev/null @@ -1,31 +0,0 @@ -import sys - -import orchpy.unison as unison -import orchpy.services as services -import orchpy.worker as worker - -@worker.distributed([str], [str]) -def print_string(string): - print "called print_string with", string - return string - -@worker.distributed([int, int], [int, int]) -def handle_int(a, b): - return a+1, b+1 - -def address(host, port): - return host + ":" + str(port) - -if __name__ == '__main__': - ip_address = sys.argv[1] - scheduler_port = sys.argv[2] - worker_port = sys.argv[3] - objstore_port = sys.argv[4] - - worker.global_worker.connect(address(ip_address, scheduler_port), address(ip_address, worker_port), address(ip_address, objstore_port)) - - worker.global_worker.register_function("hello_world", print_string, 1) - worker.global_worker.register_function("handle_int", handle_int, 2) - worker.global_worker.start_worker_service() - - worker.global_worker.main_loop() diff --git a/test/testrecv.py b/test/testrecv.py index e58984ea5..13383a728 100644 --- a/test/testrecv.py +++ b/test/testrecv.py @@ -4,6 +4,15 @@ import orchpy.unison as unison import orchpy.services as services import orchpy.worker as worker +@worker.distributed([str], [str]) +def print_string(string): + print "called print_string with", string + return string + +@worker.distributed([int, int], [int, int]) +def handle_int(a, b): + return a + 1, b + 1 + if __name__ == '__main__': ip_address = sys.argv[1] scheduler_port = sys.argv[2] @@ -16,7 +25,10 @@ if __name__ == '__main__': worker = worker.Worker() worker.connect(address(ip_address, scheduler_port), address(ip_address, worker_port), address(ip_address, objstore_port)) worker.start_worker_service() - worker.register_function("hello_world", None, 0) + + worker.register_function("print_string", print_string, 0) + worker.register_function("handle_int", handle_int, 0) + name, args, returnref = worker.wait_for_next_task() print "received args ", args if args == ["hi"]: