2016-02-22 13:55:06 -08:00
import unittest
2016-06-10 14:12:15 -07:00
import ray
import ray . serialization as serialization
import ray . services as services
import ray . worker as worker
2016-02-22 13:55:06 -08:00
import numpy as np
import time
2016-03-01 01:02:08 -08:00
import subprocess32 as subprocess
import os
2016-04-18 13:05:36 -07:00
import test_functions
2016-06-27 11:35:31 -07:00
import ray . array . remote as ra
import ray . array . distributed as da
2016-04-18 13:05:36 -07:00
2016-06-21 17:39:48 -07:00
RAY_TEST_OBJECTS = [ [ 1 , " hello " , 3.0 ] , 42 , " hello world " , 42.0 ,
( 1.0 , " hi " ) , None , ( None , None ) , ( " hello " , None ) ,
True , False , ( True , False ) ,
{ True : " hello " , False : " world " } ,
2016-06-23 18:39:02 -07:00
{ " hello " : " world " , 1 : 42 , 1.0 : 45 } , { } ,
np . int8 ( 3 ) , np . int32 ( 4 ) , np . int64 ( 5 ) ,
np . uint8 ( 3 ) , np . uint32 ( 4 ) , np . uint64 ( 5 ) ,
np . float32 ( 1.0 ) , np . float64 ( 1.0 ) ]
2016-06-21 17:39:48 -07:00
class UserDefinedType ( object ) :
def __init__ ( self ) :
pass
def deserialize ( self , primitives ) :
return " user defined type "
def serialize ( self ) :
return " user defined type "
2016-03-10 12:35:31 -08:00
class SerializationTest ( unittest . TestCase ) :
2016-06-27 11:57:22 -07:00
def roundTripTest ( self , data ) :
serialized , _ = serialization . serialize ( ray . worker . global_worker . handle , data )
result = serialization . deserialize ( ray . worker . global_worker . handle , serialized )
2016-03-10 12:35:31 -08:00
self . assertEqual ( data , result )
2016-06-27 11:57:22 -07:00
def numpyTypeTest ( self , typ ) :
2016-03-15 13:06:51 -07:00
a = np . random . randint ( 0 , 10 , size = ( 100 , 100 ) ) . astype ( typ )
2016-06-27 11:57:22 -07:00
b , _ = serialization . serialize ( ray . worker . global_worker . handle , a )
c = serialization . deserialize ( ray . worker . global_worker . handle , b )
2016-03-15 13:06:51 -07:00
self . assertTrue ( ( a == c ) . all ( ) )
2016-06-23 18:39:02 -07:00
a = np . array ( 0 ) . astype ( typ )
2016-06-27 11:57:22 -07:00
b , _ = serialization . serialize ( ray . worker . global_worker . handle , a )
c = serialization . deserialize ( ray . worker . global_worker . handle , b )
2016-06-23 18:39:02 -07:00
self . assertTrue ( ( a == c ) . all ( ) )
a = np . empty ( ( 0 , ) ) . astype ( typ )
2016-06-27 11:57:22 -07:00
b , _ = serialization . serialize ( ray . worker . global_worker . handle , a )
c = serialization . deserialize ( ray . worker . global_worker . handle , b )
2016-06-23 18:39:02 -07:00
self . assertTrue ( a . dtype == c . dtype )
2016-03-10 12:35:31 -08:00
def testSerialize ( self ) :
2016-06-27 11:57:22 -07:00
services . start_ray_local ( )
2016-03-10 12:35:31 -08:00
2016-06-21 17:39:48 -07:00
for val in RAY_TEST_OBJECTS :
2016-06-27 11:57:22 -07:00
self . roundTripTest ( val )
2016-03-24 23:35:38 -07:00
2016-03-10 12:35:31 -08:00
a = np . zeros ( ( 100 , 100 ) )
2016-06-27 11:57:22 -07:00
res , _ = serialization . serialize ( ray . worker . global_worker . handle , a )
b = serialization . deserialize ( ray . worker . global_worker . handle , res )
2016-03-10 12:35:31 -08:00
self . assertTrue ( ( a == b ) . all ( ) )
2016-03-08 16:14:02 -08:00
2016-06-27 11:57:22 -07:00
self . numpyTypeTest ( " int8 " )
self . numpyTypeTest ( " uint8 " )
self . numpyTypeTest ( " int16 " )
self . numpyTypeTest ( " uint16 " )
self . numpyTypeTest ( " int32 " )
self . numpyTypeTest ( " uint32 " )
self . numpyTypeTest ( " float32 " )
self . numpyTypeTest ( " float64 " )
2016-04-18 13:05:36 -07:00
2016-06-27 11:57:22 -07:00
ref0 = ray . put ( 0 )
ref1 = ray . put ( 0 )
ref2 = ray . put ( 0 )
ref3 = ray . put ( 0 )
2016-06-10 16:32:48 -07:00
2016-04-18 13:05:36 -07:00
a = np . array ( [ [ ref0 , ref1 ] , [ ref2 , ref3 ] ] )
2016-06-27 11:57:22 -07:00
capsule , _ = serialization . serialize ( ray . worker . global_worker . handle , a )
result = serialization . deserialize ( ray . worker . global_worker . handle , capsule )
2016-03-15 13:06:51 -07:00
self . assertTrue ( ( a == result ) . all ( ) )
2016-06-27 11:57:22 -07:00
self . roundTripTest ( ref0 )
self . roundTripTest ( [ ref0 , ref1 , ref2 , ref3 ] )
self . roundTripTest ( { " 0 " : ref0 , " 1 " : ref1 , " 2 " : ref2 , " 3 " : ref3 } )
self . roundTripTest ( ( ref0 , 1 ) )
2016-06-10 16:32:48 -07:00
2016-04-18 13:05:36 -07:00
services . cleanup ( )
2016-03-12 15:25:45 -08:00
2016-02-22 13:55:06 -08:00
class ObjStoreTest ( unittest . TestCase ) :
2016-03-15 13:06:51 -07:00
# Test setting up object stores, transfering data between them and retrieving data to a client
2016-02-22 13:55:06 -08:00
def testObjStore ( self ) :
2016-06-27 11:57:22 -07:00
[ w1 , w2 ] = services . start_services_local ( return_drivers = True , num_objstores = 2 , num_workers_per_objstore = 0 )
2016-02-22 13:55:06 -08:00
2016-06-23 12:58:48 -07:00
# putting and getting an object shouldn't change it
2016-03-09 11:40:36 -08:00
for data in [ " h " , " h " * 10000 , 0 , 0.0 ] :
2016-06-23 12:58:48 -07:00
objref = ray . put ( data , w1 )
result = ray . get ( objref , w1 )
2016-03-09 11:40:36 -08:00
self . assertEqual ( result , data )
2016-06-23 12:58:48 -07:00
# putting an object, shipping it to another worker, and getting it shouldn't change it
2016-04-24 19:06:14 -07:00
for data in [ " h " , " h " * 10000 , 0 , 0.0 , [ 1 , 2 , 3 , " a " , ( 1 , 2 ) ] , ( " a " , ( " b " , 3 ) ) ] :
2016-06-23 12:58:48 -07:00
objref = worker . put ( data , w1 )
result = worker . get ( objref , w2 )
2016-04-24 19:06:14 -07:00
self . assertEqual ( result , data )
2016-06-23 12:58:48 -07:00
# putting an array, shipping it to another worker, and getting it shouldn't change it
2016-04-24 19:06:14 -07:00
for data in [ np . zeros ( [ 10 , 20 ] ) , np . random . normal ( size = [ 45 , 25 ] ) ] :
2016-06-23 12:58:48 -07:00
objref = worker . put ( data , w1 )
result = worker . get ( objref , w2 )
2016-04-24 19:06:14 -07:00
self . assertTrue ( np . alltrue ( result == data ) )
"""
2016-06-23 12:58:48 -07:00
# getting multiple times shouldn't matter
2016-04-24 19:06:14 -07:00
for data in [ np . zeros ( [ 10 , 20 ] ) , np . random . normal ( size = [ 45 , 25 ] ) , np . zeros ( [ 10 , 20 ] , dtype = np . dtype ( " float64 " ) ) , np . zeros ( [ 10 , 20 ] , dtype = np . dtype ( " float32 " ) ) , np . zeros ( [ 10 , 20 ] , dtype = np . dtype ( " int64 " ) ) , np . zeros ( [ 10 , 20 ] , dtype = np . dtype ( " int32 " ) ) ] :
2016-06-23 12:58:48 -07:00
objref = worker . put ( data , w1 )
result = worker . get ( objref , w2 )
result = worker . get ( objref , w2 )
result = worker . get ( objref , w2 )
2016-04-24 19:06:14 -07:00
self . assertTrue ( np . alltrue ( result == data ) )
"""
# shipping a numpy array inside something else should be fine
data = ( " a " , np . random . normal ( size = [ 10 , 10 ] ) )
2016-06-23 12:58:48 -07:00
objref = worker . put ( data , w1 )
result = worker . get ( objref , w2 )
2016-04-24 19:06:14 -07:00
self . assertTrue ( data [ 0 ] == result [ 0 ] )
self . assertTrue ( np . alltrue ( data [ 1 ] == result [ 1 ] ) )
# shipping a numpy array inside something else should be fine
data = [ " a " , np . random . normal ( size = [ 10 , 10 ] ) ]
2016-06-23 12:58:48 -07:00
objref = worker . put ( data , w1 )
result = worker . get ( objref , w2 )
2016-04-24 19:06:14 -07:00
self . assertTrue ( data [ 0 ] == result [ 0 ] )
self . assertTrue ( np . alltrue ( data [ 1 ] == result [ 1 ] ) )
2016-02-22 13:55:06 -08:00
2016-03-01 01:02:08 -08:00
services . cleanup ( )
2016-02-22 16:06:16 -08:00
2016-03-10 14:40:46 -08:00
class WorkerTest ( unittest . TestCase ) :
2016-06-23 12:58:48 -07:00
def testPutGet ( self ) :
2016-06-27 11:57:22 -07:00
services . start_ray_local ( )
2016-03-10 14:40:46 -08:00
for i in range ( 100 ) :
value_before = i * 10 * * 6
2016-06-27 11:57:22 -07:00
objref = ray . put ( value_before )
value_after = ray . get ( objref )
2016-03-10 14:40:46 -08:00
self . assertEqual ( value_before , value_after )
for i in range ( 100 ) :
value_before = i * 10 * * 6 * 1.0
2016-06-27 11:57:22 -07:00
objref = ray . put ( value_before )
value_after = ray . get ( objref )
2016-03-10 14:40:46 -08:00
self . assertEqual ( value_before , value_after )
for i in range ( 100 ) :
value_before = " h " * i
2016-06-27 11:57:22 -07:00
objref = ray . put ( value_before )
value_after = ray . get ( objref )
2016-03-10 14:40:46 -08:00
self . assertEqual ( value_before , value_after )
for i in range ( 100 ) :
value_before = [ 1 ] * i
2016-06-27 11:57:22 -07:00
objref = ray . put ( value_before )
value_after = ray . get ( objref )
2016-03-10 14:40:46 -08:00
self . assertEqual ( value_before , value_after )
services . cleanup ( )
2016-04-05 00:34:23 -07:00
class APITest ( unittest . TestCase ) :
def testObjRefAliasing ( self ) :
2016-06-19 22:01:31 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 3 , worker_path = worker_path )
2016-04-08 12:58:08 -07:00
2016-06-27 11:57:22 -07:00
ref = test_functions . test_alias_f ( )
self . assertTrue ( np . alltrue ( ray . get ( ref ) == np . ones ( [ 3 , 4 , 5 ] ) ) )
ref = test_functions . test_alias_g ( )
self . assertTrue ( np . alltrue ( ray . get ( ref ) == np . ones ( [ 3 , 4 , 5 ] ) ) )
ref = test_functions . test_alias_h ( )
self . assertTrue ( np . alltrue ( ray . get ( ref ) == np . ones ( [ 3 , 4 , 5 ] ) ) )
2016-04-05 00:34:23 -07:00
2016-04-18 13:05:36 -07:00
services . cleanup ( )
2016-06-03 00:10:17 -07:00
def testKeywordArgs ( self ) :
2016-06-19 22:01:31 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 1 , worker_path = worker_path )
2016-06-04 16:22:10 -07:00
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct1 ( 1 )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 1 hello " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct1 ( 1 , " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 1 hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct1 ( 1 , b = " world " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 1 world " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( a = " w " , b = " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " w hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( b = " hi " , a = " w " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " w hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( a = " w " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " w world " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( b = " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " hello hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( " w " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " w world " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct2 ( " w " , " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " w hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct3 ( 0 , 1 , c = " w " , d = " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 w hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct3 ( 0 , 1 , d = " hi " , c = " w " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 w hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct3 ( 0 , 1 , c = " w " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 w world " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct3 ( 0 , 1 , d = " hi " )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 hello hi " )
2016-06-03 00:10:17 -07:00
x = test_functions . keyword_fct3 ( 0 , 1 )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 hello world " )
2016-06-03 00:10:17 -07:00
services . cleanup ( )
2016-04-18 13:05:36 -07:00
2016-06-04 16:22:10 -07:00
def testVariableNumberOfArgs ( self ) :
2016-06-19 22:01:31 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 1 , worker_path = worker_path )
2016-06-04 16:22:10 -07:00
x = test_functions . varargs_fct1 ( 0 , 1 , 2 )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 0 1 2 " )
2016-06-04 16:22:10 -07:00
x = test_functions . varargs_fct2 ( 0 , 1 , 2 )
2016-06-23 12:58:48 -07:00
self . assertEqual ( ray . get ( x ) , " 1 2 " )
2016-06-04 16:22:10 -07:00
self . assertTrue ( test_functions . kwargs_exception_thrown )
self . assertTrue ( test_functions . varargs_and_kwargs_exception_thrown )
services . cleanup ( )
2016-07-05 15:57:05 -07:00
def testNoArgs ( self ) :
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-07-05 18:59:54 -07:00
services . start_ray_local ( num_workers = 1 , worker_path = worker_path , driver_mode = ray . WORKER_MODE )
2016-07-05 15:57:05 -07:00
test_functions . no_op ( )
time . sleep ( 0.2 )
task_info = ray . task_info ( )
self . assertEqual ( len ( task_info [ " failed_tasks " ] ) , 0 )
self . assertEqual ( len ( task_info [ " running_tasks " ] ) , 0 )
self . assertEqual ( task_info [ " num_succeeded " ] , 1 )
test_functions . no_op_fail ( )
time . sleep ( 0.2 )
task_info = ray . task_info ( )
self . assertEqual ( len ( task_info [ " failed_tasks " ] ) , 1 )
self . assertEqual ( len ( task_info [ " running_tasks " ] ) , 0 )
self . assertEqual ( task_info [ " num_succeeded " ] , 1 )
self . assertEqual ( task_info [ " failed_tasks " ] [ 0 ] . get ( " error_message " ) , " The @remote decorator for function test_functions.no_op_fail has 0 return values, but test_functions.no_op_fail returned more than 0 values. " )
services . cleanup ( )
def testTypeChecking ( self ) :
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
services . start_ray_local ( num_workers = 1 , worker_path = worker_path , driver_mode = ray . WORKER_MODE )
# Make sure that these functions throw exceptions because there return
# values do not type check.
test_functions . test_return1 ( )
test_functions . test_return2 ( )
time . sleep ( 0.2 )
task_info = ray . task_info ( )
self . assertEqual ( len ( task_info [ " failed_tasks " ] ) , 2 )
self . assertEqual ( len ( task_info [ " running_tasks " ] ) , 0 )
self . assertEqual ( task_info [ " num_succeeded " ] , 0 )
services . cleanup ( )
2016-06-18 01:01:48 +03:00
class TaskStatusTest ( unittest . TestCase ) :
def testFailedTask ( self ) :
2016-06-19 22:01:31 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 3 , worker_path = worker_path , driver_mode = ray . WORKER_MODE )
2016-06-18 01:01:48 +03:00
test_functions . test_alias_f ( )
2016-06-25 09:43:57 -07:00
test_functions . throw_exception_fct1 ( )
test_functions . throw_exception_fct1 ( )
2016-06-18 01:01:48 +03:00
time . sleep ( 1 )
result = ray . task_info ( )
2016-06-24 19:43:24 -07:00
self . assertTrue ( len ( result [ " failed_tasks " ] ) == 2 )
2016-06-18 01:01:48 +03:00
task_ids = set ( )
2016-06-24 19:43:24 -07:00
for task in result [ " failed_tasks " ] :
self . assertTrue ( task . has_key ( " worker_address " ) )
self . assertTrue ( task . has_key ( " operationid " ) )
2016-07-06 00:25:02 -07:00
self . assertTrue ( " Test function 1 intentionally failed. " in task . get ( " error_message " ) )
2016-06-24 19:43:24 -07:00
self . assertTrue ( task [ " operationid " ] not in task_ids )
task_ids . add ( task [ " operationid " ] )
2016-06-18 01:01:48 +03:00
2016-06-25 09:43:57 -07:00
x = test_functions . throw_exception_fct2 ( )
try :
ray . get ( x )
except Exception as e :
2016-07-06 00:25:02 -07:00
self . assertTrue ( " Test function 2 intentionally failed. " in str ( e ) )
2016-06-25 09:43:57 -07:00
else :
self . assertTrue ( False ) # ray.get should throw an exception
x , y , z = test_functions . throw_exception_fct3 ( 1.0 )
for ref in [ x , y , z ] :
try :
ray . get ( ref )
except Exception as e :
2016-07-06 00:25:02 -07:00
self . assertTrue ( " Test function 3 intentionally failed. " in str ( e ) )
2016-06-25 09:43:57 -07:00
else :
self . assertTrue ( False ) # ray.get should throw an exception
2016-06-23 12:58:48 -07:00
def check_get_deallocated ( data ) :
x = ray . put ( data )
ray . get ( x )
2016-06-21 17:39:48 -07:00
return x . val
2016-06-23 12:58:48 -07:00
def check_get_not_deallocated ( data ) :
x = ray . put ( data )
y = ray . get ( x )
2016-06-21 17:39:48 -07:00
return y , x . val
2016-04-18 13:05:36 -07:00
class ReferenceCountingTest ( unittest . TestCase ) :
def testDeallocation ( self ) :
2016-06-19 22:01:31 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 3 , worker_path = worker_path )
2016-04-18 13:05:36 -07:00
x = test_functions . test_alias_f ( )
2016-06-23 12:58:48 -07:00
ray . get ( x )
2016-04-18 13:05:36 -07:00
time . sleep ( 0.1 )
objref_val = x . val
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val ] == 1 )
2016-04-18 13:05:36 -07:00
del x
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val ] == - 1 ) # -1 indicates deallocated
2016-04-18 13:05:36 -07:00
y = test_functions . test_alias_h ( )
2016-06-23 12:58:48 -07:00
ray . get ( y )
2016-04-18 13:05:36 -07:00
time . sleep ( 0.1 )
objref_val = y . val
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ 1 , 0 , 0 ] )
2016-04-18 13:05:36 -07:00
del y
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ - 1 , - 1 , - 1 ] )
2016-04-18 13:05:36 -07:00
2016-06-05 21:57:37 -07:00
z = da . zeros ( [ da . BLOCK_SIZE , 2 * da . BLOCK_SIZE ] , " float " )
2016-04-18 13:05:36 -07:00
time . sleep ( 0.1 )
objref_val = z . val
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ 1 , 1 , 1 ] )
2016-04-18 13:05:36 -07:00
del z
time . sleep ( 0.1 )
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ - 1 , - 1 , - 1 ] )
2016-04-18 13:05:36 -07:00
2016-06-05 21:57:37 -07:00
x = ra . zeros ( [ 10 , 10 ] , " float " )
y = ra . zeros ( [ 10 , 10 ] , " float " )
z = ra . dot ( x , y )
2016-04-18 13:05:36 -07:00
objref_val = x . val
time . sleep ( 0.1 )
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ 1 , 1 , 1 ] )
2016-04-18 13:05:36 -07:00
del x
time . sleep ( 0.1 )
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ - 1 , 1 , 1 ] )
2016-04-18 13:05:36 -07:00
del y
time . sleep ( 0.1 )
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ - 1 , - 1 , 1 ] )
2016-04-18 13:05:36 -07:00
del z
time . sleep ( 0.1 )
2016-06-10 14:12:15 -07:00
self . assertTrue ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val : ( objref_val + 3 ) ] == [ - 1 , - 1 , - 1 ] )
2016-04-18 13:05:36 -07:00
services . cleanup ( )
2016-06-23 12:58:48 -07:00
def testGet ( self ) :
2016-06-21 17:39:48 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 3 , worker_path = worker_path )
2016-06-21 17:39:48 -07:00
for val in RAY_TEST_OBJECTS + [ np . zeros ( ( 2 , 2 ) ) , UserDefinedType ( ) ] :
2016-06-23 12:58:48 -07:00
objref_val = check_get_deallocated ( val )
2016-06-21 17:39:48 -07:00
self . assertEqual ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val ] , - 1 )
2016-06-23 18:39:02 -07:00
if not isinstance ( val , bool ) and not isinstance ( val , np . generic ) and val is not None :
2016-06-23 12:58:48 -07:00
x , objref_val = check_get_not_deallocated ( val )
2016-06-21 17:39:48 -07:00
self . assertEqual ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val ] , 1 )
2016-06-24 16:56:26 -07:00
# The following currently segfaults: The second "result = " closes the
# memory segment as soon as the assignment is done (and the first result
# goes out of scope).
"""
data = np . zeros ( [ 10 , 20 ] )
objref = ray . put ( data )
result = worker . get ( objref )
result = worker . get ( objref )
self . assertTrue ( np . alltrue ( result == data ) )
"""
2016-06-21 17:39:48 -07:00
services . cleanup ( )
@unittest.expectedFailure
2016-06-23 12:58:48 -07:00
def testGetFailing ( self ) :
2016-06-21 17:39:48 -07:00
worker_path = os . path . join ( os . path . dirname ( os . path . abspath ( __file__ ) ) , " test_worker.py " )
2016-06-27 11:57:22 -07:00
services . start_ray_local ( num_workers = 3 , worker_path = worker_path )
2016-06-21 17:39:48 -07:00
# This is failing, because for bool and None, we cannot track python
# refcounts and therefore cannot keep the refcount up
# (see 5281bd414f6b404f61e1fe25ec5f6651defee206).
# The resulting behavior is still correct however because True, False and
2016-06-23 12:58:48 -07:00
# None are returned by get "by value" and therefore can be reclaimed from
2016-06-21 17:39:48 -07:00
# the object store safely.
for val in [ True , False , None ] :
2016-06-23 12:58:48 -07:00
x , objref_val = check_get_not_deallocated ( val )
2016-06-21 17:39:48 -07:00
self . assertEqual ( ray . scheduler_info ( ) [ " reference_counts " ] [ objref_val ] , 1 )
services . cleanup ( )
2016-06-26 13:43:54 -07:00
class PythonModeTest ( unittest . TestCase ) :
def testObjRefAliasing ( self ) :
2016-06-27 11:57:22 -07:00
services . start_ray_local ( driver_mode = ray . PYTHON_MODE )
2016-06-26 13:43:54 -07:00
xref = test_functions . test_alias_h ( )
self . assertTrue ( np . alltrue ( xref == np . ones ( [ 3 , 4 , 5 ] ) ) ) # remote functions should return by value
self . assertTrue ( np . alltrue ( xref == ray . get ( xref ) ) ) # ray.get should be the identity
y = np . random . normal ( size = [ 11 , 12 ] )
self . assertTrue ( np . alltrue ( y == ray . put ( y ) ) ) # ray.put should be the identity
# make sure objects are immutable, this example is why we need to copy
# arguments before passing them into remote functions in python mode
aref = test_functions . python_mode_f ( )
self . assertTrue ( np . alltrue ( aref == np . array ( [ 0 , 0 ] ) ) )
bref = test_functions . python_mode_g ( aref )
self . assertTrue ( np . alltrue ( aref == np . array ( [ 0 , 0 ] ) ) ) # python_mode_g should not mutate aref
self . assertTrue ( np . alltrue ( bref == np . array ( [ 1 , 0 ] ) ) )
services . cleanup ( )
2016-06-24 19:43:24 -07:00
if __name__ == " __main__ " :
2016-02-22 13:55:06 -08:00
unittest . main ( )