diff --git a/src/common/Makefile b/src/common/Makefile index 689c4c901..d2efda314 100644 --- a/src/common/Makefile +++ b/src/common/Makefile @@ -23,7 +23,7 @@ $(BUILD)/io_tests: test/io_tests.c $(BUILD)/libcommon.a $(CC) -o $@ $^ $(CFLAGS) $(BUILD)/task_tests: test/task_tests.c $(BUILD)/libcommon.a - $(CC) -o $@ $^ $(CFLAGS) + $(CC) -o $@ $^ thirdparty/hiredis/libhiredis.a $(CFLAGS) $(BUILD)/redis_tests: hiredis test/redis_tests.c $(BUILD)/libcommon.a logging.h $(CC) -o $@ test/redis_tests.c logging.c $(BUILD)/libcommon.a thirdparty/hiredis/libhiredis.a $(CFLAGS) @@ -40,7 +40,14 @@ hiredis: test: hiredis redis $(BUILD)/common_tests $(BUILD)/task_log_tests $(BUILD)/object_table_tests $(BUILD)/db_tests $(BUILD)/io_tests $(BUILD)/task_tests $(BUILD)/redis_tests FORCE ./thirdparty/redis-3.2.3/src/redis-server & - sleep 1s ; ./build/common_tests ; ./build/db_tests ; ./build/task_log_tests ; ./build/object_table_tests ; ./build/io_tests ; ./build/task_tests ; ./build/redis_tests + sleep 1s + ./build/common_tests + ./build/db_tests + ./build/io_tests + ./build/task_tests + ./build/redis_tests + ./build/task_log_tests + ./build/object_table_tests valgrind: test valgrind --leak-check=full --error-exitcode=1 ./build/common_tests diff --git a/src/common/common.h b/src/common/common.h index 927534327..c3cef0d29 100644 --- a/src/common/common.h +++ b/src/common/common.h @@ -44,12 +44,6 @@ #define UNIQUE_ID_SIZE 20 -/* Cleanup method for running tests with the greatest library. - * Runs the test, then clears the Redis database. */ -#define RUN_REDIS_TEST(context, test) \ - RUN_TEST(test); \ - freeReplyObject(redisCommand(context, "FLUSHALL")); - typedef struct { unsigned char id[UNIQUE_ID_SIZE]; } unique_id; extern const unique_id NIL_ID; diff --git a/src/common/io.c b/src/common/io.c index 1f1125bc3..b71e575a7 100644 --- a/src/common/io.c +++ b/src/common/io.c @@ -25,10 +25,11 @@ * write_message and read_message methods. * * @param port The port to bind to. + * @param shall_listen Are we also starting to listen on the socket? * @return A non-blocking file descriptor for the socket, or -1 if an error * occurs. */ -int bind_inet_sock(const int port) { +int bind_inet_sock(const int port, bool shall_listen) { struct sockaddr_in name; int socket_fd = socket(PF_INET, SOCK_STREAM, 0); if (socket_fd < 0) { @@ -55,7 +56,7 @@ int bind_inet_sock(const int port) { close(socket_fd); return -1; } - if (listen(socket_fd, 5) == -1) { + if (shall_listen && listen(socket_fd, 5) == -1) { LOG_ERR("Could not listen to socket %d", port); close(socket_fd); return -1; @@ -68,10 +69,11 @@ int bind_inet_sock(const int port) { * pathname. Removes any existing file at the pathname. * * @param socket_pathname The pathname for the socket. + * @param shall_listen Are we also starting to listen on the socket? * @return A blocking file descriptor for the socket, or -1 if an error * occurs. */ -int bind_ipc_sock(const char *socket_pathname) { +int bind_ipc_sock(const char *socket_pathname, bool shall_listen) { struct sockaddr_un socket_address; int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0); if (socket_fd < 0) { @@ -104,7 +106,7 @@ int bind_ipc_sock(const char *socket_pathname) { close(socket_fd); return -1; } - if (listen(socket_fd, 5) == -1) { + if (shall_listen && listen(socket_fd, 5) == -1) { LOG_ERR("Could not listen to socket %s", socket_pathname); close(socket_fd); return -1; diff --git a/src/common/io.h b/src/common/io.h index 362b85fc1..81ea160c4 100644 --- a/src/common/io.h +++ b/src/common/io.h @@ -1,6 +1,7 @@ #ifndef IO_H #define IO_H +#include #include enum common_message_type { @@ -14,8 +15,8 @@ enum common_message_type { /* Helper functions for socket communication. */ -int bind_inet_sock(const int port); -int bind_ipc_sock(const char *socket_pathname); +int bind_inet_sock(const int port, bool shall_listen); +int bind_ipc_sock(const char *socket_pathname, bool shall_listen); int connect_ipc_sock(const char *socket_pathname); int accept_client(int socket_fd); @@ -29,4 +30,4 @@ void write_log_message(int fd, char *message); void write_formatted_log_message(int fd, const char *format, ...); char *read_log_message(int fd); -#endif +#endif /* IO_H */ diff --git a/src/common/test/db_tests.c b/src/common/test/db_tests.c index bc0bfa6ae..9bf3c96db 100644 --- a/src/common/test/db_tests.c +++ b/src/common/test/db_tests.c @@ -5,7 +5,7 @@ #include #include "event_loop.h" -#include "test/example_task.h" +#include "test_common.h" #include "state/db.h" #include "state/object_table.h" #include "state/task_log.h" @@ -197,13 +197,10 @@ TEST unique_client_id_test(void) { } SUITE(db_tests) { - redisContext *context = redisConnect("127.0.0.1", 6379); - freeReplyObject(redisCommand(context, "FLUSHALL")); - RUN_REDIS_TEST(context, object_table_lookup_test); - RUN_REDIS_TEST(context, task_log_test); - RUN_REDIS_TEST(context, task_log_all_test); - RUN_REDIS_TEST(context, unique_client_id_test); - redisFree(context); + RUN_REDIS_TEST(object_table_lookup_test); + RUN_REDIS_TEST(task_log_test); + RUN_REDIS_TEST(task_log_all_test); + RUN_REDIS_TEST(unique_client_id_test); } GREATEST_MAIN_DEFS(); diff --git a/src/common/test/io_tests.c b/src/common/test/io_tests.c index 56ebf0607..e342df914 100644 --- a/src/common/test/io_tests.c +++ b/src/common/test/io_tests.c @@ -11,7 +11,7 @@ SUITE(io_tests); TEST ipc_socket_test(void) { const char *socket_pathname = "test-socket"; - int socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); char *test_string = "hello world"; @@ -50,7 +50,7 @@ TEST ipc_socket_test(void) { TEST long_ipc_socket_test(void) { const char *socket_pathname = "long-test-socket"; - int socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); UT_string *test_string; diff --git a/src/common/test/object_table_tests.c b/src/common/test/object_table_tests.c index 3345ac50f..e0e496af9 100644 --- a/src/common/test/object_table_tests.c +++ b/src/common/test/object_table_tests.c @@ -1,7 +1,7 @@ #include "greatest.h" #include "event_loop.h" -#include "example_task.h" +#include "test_common.h" #include "common.h" #include "state/object_table.h" #include "state/redis.h" diff --git a/src/common/test/redis_tests.c b/src/common/test/redis_tests.c index e756f871c..4caec7a53 100644 --- a/src/common/test/redis_tests.c +++ b/src/common/test/redis_tests.c @@ -10,6 +10,7 @@ #include "state/redis.h" #include "io.h" #include "logging.h" +#include "test_common.h" SUITE(redis_tests); @@ -40,7 +41,7 @@ TEST redis_socket_test(void) { const char *socket_pathname = "redis-test-socket"; redisContext *context = redisConnect("127.0.0.1", 6379); ASSERT(context != NULL); - int socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); int client_fd = connect_ipc_sock(socket_pathname); @@ -97,7 +98,7 @@ TEST async_redis_socket_test(void) { /* Start IPC channel. */ const char *socket_pathname = "async-redis-test-socket"; - int socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); utarray_push_back(connections, &socket_fd); @@ -171,7 +172,7 @@ TEST logging_test(void) { /* Start IPC channel. */ const char *socket_pathname = "logging-test-socket"; - int socket_fd = bind_ipc_sock(socket_pathname); + int socket_fd = bind_ipc_sock(socket_pathname, true); ASSERT(socket_fd >= 0); utarray_push_back(connections, &socket_fd); @@ -208,12 +209,9 @@ TEST logging_test(void) { } SUITE(redis_tests) { - redisContext *context = redisConnect("127.0.0.1", 6379); - freeReplyObject(redisCommand(context, "FLUSHALL")); - RUN_REDIS_TEST(context, redis_socket_test); - RUN_REDIS_TEST(context, async_redis_socket_test); - RUN_REDIS_TEST(context, logging_test); - redisFree(context); + RUN_REDIS_TEST(redis_socket_test); + RUN_REDIS_TEST(async_redis_socket_test); + RUN_REDIS_TEST(logging_test); } GREATEST_MAIN_DEFS(); diff --git a/src/common/test/task_log_tests.c b/src/common/test/task_log_tests.c index 570cecacf..58c790768 100644 --- a/src/common/test/task_log_tests.c +++ b/src/common/test/task_log_tests.c @@ -1,7 +1,7 @@ #include "greatest.h" #include "event_loop.h" -#include "example_task.h" +#include "test_common.h" #include "common.h" #include "state/object_table.h" #include "state/redis.h" diff --git a/src/common/test/task_tests.c b/src/common/test/task_tests.c index 77f43bbf0..3a7862455 100644 --- a/src/common/test/task_tests.c +++ b/src/common/test/task_tests.c @@ -5,7 +5,7 @@ #include #include "common.h" -#include "test/example_task.h" +#include "test_common.h" #include "task.h" #include "io.h" diff --git a/src/common/test/example_task.h b/src/common/test/test_common.h similarity index 50% rename from src/common/test/example_task.h rename to src/common/test/test_common.h index f752b824d..305d74446 100644 --- a/src/common/test/example_task.h +++ b/src/common/test/test_common.h @@ -1,5 +1,7 @@ -#ifndef EXAMPLE_TASK_H -#define EXAMPLE_TASK_H +#ifndef TEST_COMMON_H +#define TEST_COMMON_H + +#include "hiredis/hiredis.h" #include "task.h" @@ -20,4 +22,18 @@ task_instance *example_task_instance(void) { return instance; } -#endif +/* Flush redis. */ +void flushall_redis() { + redisContext *context = redisConnect("127.0.0.1", 6379); + freeReplyObject(redisCommand(context, "FLUSHALL")); + redisFree(context); +} + +/* Cleanup method for running tests with the greatest library. + * Runs the test, then clears the Redis database. */ +#define RUN_REDIS_TEST(test) \ + flushall_redis(); \ + RUN_TEST(test); \ + flushall_redis(); + +#endif /* TEST_COMMON */ diff --git a/src/photon/photon_scheduler.c b/src/photon/photon_scheduler.c index fa96e4e8a..1b3474e62 100644 --- a/src/photon/photon_scheduler.c +++ b/src/photon/photon_scheduler.c @@ -176,7 +176,7 @@ void start_server(const char *socket_name, const char *redis_addr, int redis_port, const char *plasma_socket_name) { - int fd = bind_ipc_sock(socket_name); + int fd = bind_ipc_sock(socket_name, true); event_loop *loop = event_loop_create(); g_state = init_local_scheduler(loop, redis_addr, redis_port, plasma_socket_name); diff --git a/src/plasma/plasma_manager.c b/src/plasma/plasma_manager.c index e48d90e34..614bd38c9 100644 --- a/src/plasma/plasma_manager.c +++ b/src/plasma/plasma_manager.c @@ -903,16 +903,23 @@ void start_server(const char *store_socket_name, int port, const char *db_addr, int db_port) { + /* Bind the sockets before we try to connect to the plasma store. + * In case the bind does not succeed, we want to be able to exit + * without breaking the pipe to the store. */ + int remote_sock = bind_inet_sock(port, false); + if (remote_sock < 0) { + exit(EXIT_COULD_NOT_BIND_PORT); + } + + int local_sock = bind_ipc_sock(manager_socket_name, false); + CHECKM(local_sock >= 0, "Unable to bind local manager socket"); + g_manager_state = init_plasma_manager_state(store_socket_name, master_addr, port, db_addr, db_port); CHECK(g_manager_state); - int remote_sock = bind_inet_sock(port); - if (remote_sock < 0) { - exit(EXIT_COULD_NOT_BIND_PORT); - } - int local_sock = bind_ipc_sock(manager_socket_name); - CHECKM(local_sock >= 0, "Unable to bind local manager socket"); + CHECK(listen(remote_sock, 5) != -1); + CHECK(listen(local_sock, 5) != -1); LOG_DEBUG("Started server connected to store %s, listening on port %d", store_socket_name, port); diff --git a/src/plasma/plasma_store.c b/src/plasma/plasma_store.c index c6702c3cf..0e7d404d0 100644 --- a/src/plasma/plasma_store.c +++ b/src/plasma/plasma_store.c @@ -497,7 +497,7 @@ void signal_handler(int signal) { void start_server(char *socket_name) { event_loop *loop = event_loop_create(); plasma_store_state *state = init_plasma_store(loop); - int socket = bind_ipc_sock(socket_name); + int socket = bind_ipc_sock(socket_name, true); CHECK(socket >= 0); event_loop_add_file(loop, socket, EVENT_LOOP_READ, new_client_connection, state); diff --git a/src/plasma/test/manager_tests.c b/src/plasma/test/manager_tests.c index c5226dcef..424f79206 100644 --- a/src/plasma/test/manager_tests.c +++ b/src/plasma/test/manager_tests.c @@ -9,6 +9,7 @@ #include "common.h" #include "event_loop.h" #include "io.h" +#include "utstring.h" #include "plasma.h" #include "plasma_client.h" @@ -17,9 +18,6 @@ SUITE(plasma_manager_tests); const char *manager_addr = "127.0.0.1"; -int manager_port = 12345; -const char *store_socket_name = "/tmp/store12345"; -const char *manager_socket_name = "/tmp/manager12345"; object_id oid; void wait_for_pollin(int fd) { @@ -30,6 +28,38 @@ void wait_for_pollin(int fd) { CHECK(retval > 0); } +UT_string *bind_ipc_sock_retry(int *fd) { + UT_string *socket_name = NULL; + for (int num_retries = 0; num_retries < 5; ++num_retries) { + LOG_INFO("trying to find plasma socket (attempt %d)", num_retries); + utstring_renew(socket_name); + utstring_printf(socket_name, "/tmp/plasma_socket_%d", rand()); + *fd = bind_ipc_sock(utstring_body(socket_name), true); + if (*fd < 0) { + /* Sleep for 100ms. */ + usleep(100000); + continue; + } + break; + } + return socket_name; +} + +int bind_inet_sock_retry(int *fd) { + int port = -1; + for (int num_retries = 0; num_retries < 5; ++num_retries) { + port = 10000 + rand() % 40000; + *fd = bind_inet_sock(port, true); + if (*fd < 0) { + /* Sleep for 100ms. */ + usleep(100000); + continue; + } + break; + } + return port; +} + int test_done_handler(event_loop *loop, timer_id id, void *context) { event_loop_stop(loop); return AE_NOMORE; @@ -54,20 +84,22 @@ typedef struct { client_connection *client_conn; } plasma_mock; -plasma_mock *init_plasma_mock(int port, plasma_mock *remote_mock) { +plasma_mock *init_plasma_mock(plasma_mock *remote_mock) { plasma_mock *mock = malloc(sizeof(plasma_mock)); /* Start listening on all the ports and initiate the local plasma manager. */ - mock->port = port; - mock->manager_remote_fd = bind_inet_sock(port); - mock->manager_local_fd = bind_ipc_sock(manager_socket_name); - mock->local_store = bind_ipc_sock(store_socket_name); - mock->state = - init_plasma_manager_state(store_socket_name, manager_addr, port, NULL, 0); + mock->port = bind_inet_sock_retry(&mock->manager_remote_fd); + UT_string *store_socket_name = bind_ipc_sock_retry(&mock->local_store); + UT_string *manager_socket_name = bind_ipc_sock_retry(&mock->manager_local_fd); + + CHECK(mock->manager_local_fd >= 0 && mock->local_store >= 0); + + mock->state = init_plasma_manager_state(utstring_body(store_socket_name), + manager_addr, mock->port, NULL, 0); mock->loop = get_event_loop(mock->state); /* Accept a connection from the local manager on the remote manager. */ if (remote_mock != NULL) { mock->write_conn = - get_manager_connection(remote_mock->state, manager_addr, port); + get_manager_connection(remote_mock->state, manager_addr, mock->port); wait_for_pollin(mock->manager_remote_fd); mock->read_conn = new_client_connection(mock->loop, mock->manager_remote_fd, mock->state, 0); @@ -77,10 +109,13 @@ plasma_mock *init_plasma_mock(int port, plasma_mock *remote_mock) { } /* Connect a new client to the local plasma manager and mock a request to an * object. */ - mock->plasma_conn = plasma_connect(store_socket_name, manager_socket_name); + mock->plasma_conn = plasma_connect(utstring_body(store_socket_name), + utstring_body(manager_socket_name)); wait_for_pollin(mock->manager_local_fd); mock->client_conn = new_client_connection(mock->loop, mock->manager_local_fd, mock->state, 0); + utstring_free(store_socket_name); + utstring_free(manager_socket_name); return mock; } @@ -109,10 +144,13 @@ void destroy_plasma_mock(plasma_mock *mock) { * correct object ID. */ TEST request_transfer_test(void) { - plasma_mock *local_mock = init_plasma_mock(manager_port, NULL); - plasma_mock *remote_mock = init_plasma_mock(12346, local_mock); + plasma_mock *local_mock = init_plasma_mock(NULL); + plasma_mock *remote_mock = init_plasma_mock(local_mock); const char **manager_vector = malloc(sizeof(char *)); - manager_vector[0] = "127.0.0.1:12346"; + UT_string *addr = NULL; + utstring_new(addr); + utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); + manager_vector[0] = utstring_body(addr); request_transfer(oid, 1, manager_vector, local_mock->client_conn); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT, test_done_handler, local_mock->state); @@ -126,6 +164,7 @@ TEST request_transfer_test(void) { ASSERT(req->num_object_ids == 1); ASSERT(memcmp(&oid, &req->object_ids[0], sizeof(object_id)) == 0); /* Clean up. */ + utstring_free(addr); free(req); destroy_plasma_mock(remote_mock); destroy_plasma_mock(local_mock); @@ -145,12 +184,18 @@ TEST request_transfer_test(void) { * with the correct object ID. */ TEST request_transfer_retry_test(void) { - plasma_mock *local_mock = init_plasma_mock(manager_port, NULL); - plasma_mock *remote_mock1 = init_plasma_mock(12346, local_mock); - plasma_mock *remote_mock2 = init_plasma_mock(12347, local_mock); + plasma_mock *local_mock = init_plasma_mock(NULL); + plasma_mock *remote_mock1 = init_plasma_mock(local_mock); + plasma_mock *remote_mock2 = init_plasma_mock(local_mock); const char **manager_vector = malloc(sizeof(char *) * 2); - manager_vector[0] = "127.0.0.1:12346"; - manager_vector[1] = "127.0.0.1:12347"; + UT_string *addr0 = NULL; + utstring_new(addr0); + utstring_printf(addr0, "127.0.0.1:%d", remote_mock1->port); + manager_vector[0] = utstring_body(addr0); + UT_string *addr1 = NULL; + utstring_new(addr1); + utstring_printf(addr1, "127.0.0.1:%d", remote_mock2->port); + manager_vector[1] = utstring_body(addr1); request_transfer(oid, 2, manager_vector, local_mock->client_conn); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * 2, test_done_handler, local_mock->state); @@ -165,6 +210,8 @@ TEST request_transfer_retry_test(void) { ASSERT(req->num_object_ids == 1); ASSERT(memcmp(&oid, &req->object_ids[0], sizeof(object_id)) == 0); /* Clean up. */ + utstring_free(addr0); + utstring_free(addr1); free(req); destroy_plasma_mock(remote_mock2); destroy_plasma_mock(remote_mock1); @@ -184,10 +231,13 @@ TEST request_transfer_retry_test(void) { * wasn't fetched. */ TEST request_transfer_timeout_test(void) { - plasma_mock *local_mock = init_plasma_mock(manager_port, NULL); - plasma_mock *remote_mock = init_plasma_mock(12346, local_mock); + plasma_mock *local_mock = init_plasma_mock(NULL); + plasma_mock *remote_mock = init_plasma_mock(local_mock); const char **manager_vector = malloc(sizeof(char *)); - manager_vector[0] = "127.0.0.1:12346"; + UT_string *addr = NULL; + utstring_new(addr); + utstring_printf(addr, "127.0.0.1:%d", remote_mock->port); + manager_vector[0] = utstring_body(addr); request_transfer(oid, 1, manager_vector, local_mock->client_conn); event_loop_add_timer(local_mock->loop, MANAGER_TIMEOUT * (NUM_RETRIES + 2), test_done_handler, local_mock->state); @@ -201,6 +251,7 @@ TEST request_transfer_timeout_test(void) { ASSERT_EQ(memcmp(&oid, &reply.object_ids, sizeof(object_id)), 0); ASSERT_EQ(reply.has_object, 0); /* Clean up. */ + utstring_free(addr); destroy_plasma_mock(remote_mock); destroy_plasma_mock(local_mock); PASS(); @@ -214,8 +265,8 @@ TEST request_transfer_timeout_test(void) { * - Expect to see the same data. */ TEST read_write_object_chunk_test(void) { - plasma_mock *local_mock = init_plasma_mock(manager_port, NULL); - plasma_mock *remote_mock = init_plasma_mock(12346, local_mock); + plasma_mock *local_mock = init_plasma_mock(NULL); + plasma_mock *remote_mock = init_plasma_mock(local_mock); /* Create a mock object buffer to transfer. */ const char *data = "Hello world!"; const int data_size = strlen(data) + 1; diff --git a/src/plasma/test/test.py b/src/plasma/test/test.py index 85544f382..9613126ed 100644 --- a/src/plasma/test/test.py +++ b/src/plasma/test/test.py @@ -276,6 +276,7 @@ class TestPlasmaManager(unittest.TestCase): def tearDown(self): # Kill the PlasmaStore and PlasmaManager processes. if USE_VALGRIND: + time.sleep(1) # give processes opportunity to finish work self.p4.send_signal(signal.SIGTERM) self.p4.wait() self.p5.send_signal(signal.SIGTERM)